sdjournal/
query.rs

1use crate::cursor::Cursor;
2use crate::entry::{EntryOwned, EntryRef};
3use crate::error::{LimitKind, Result, SdJournalError};
4use crate::file::{DataEntryOffsetIter, DataObjectRef, EntryMeta, FileEntryIter};
5use crate::follow::Follow;
6use crate::journal::Journal;
7use crate::util::is_ascii_field_name;
8use std::cmp::Reverse;
9use std::collections::BinaryHeap;
10
11#[derive(Debug, Clone)]
12enum MatchTerm {
13    Exact {
14        field: String,
15        value: Vec<u8>,
16        payload: Vec<u8>,
17    },
18    Present {
19        field: String,
20    },
21}
22
23/// A query builder for reading entries from a `Journal`.
24#[derive(Clone)]
25pub struct JournalQuery {
26    journal: Journal,
27
28    global_terms: Vec<MatchTerm>,
29    or_groups: Vec<Vec<MatchTerm>>,
30
31    since_realtime: Option<u64>,
32    until_realtime: Option<u64>,
33    cursor_start: Option<(Cursor, bool)>, // (cursor, inclusive)
34    reverse: bool,
35    limit: Option<usize>,
36    invalid_reason: Option<String>,
37    too_many_terms: bool,
38}
39
40impl JournalQuery {
41    pub(crate) fn new(journal: Journal) -> Self {
42        Self {
43            journal,
44            global_terms: Vec::new(),
45            or_groups: Vec::new(),
46            since_realtime: None,
47            until_realtime: None,
48            cursor_start: None,
49            reverse: false,
50            limit: None,
51            invalid_reason: None,
52            too_many_terms: false,
53        }
54    }
55
56    pub fn match_exact(&mut self, field: &str, value: &[u8]) -> &mut Self {
57        if self.invalid_reason.is_some() {
58            return self;
59        }
60        if let Err(e) = validate_field_name(field, &self.journal.inner.config) {
61            self.invalid_reason = Some(e.to_string());
62            return self;
63        }
64        if self.count_terms() >= self.journal.inner.config.max_query_terms {
65            self.too_many_terms = true;
66            return self;
67        }
68
69        let mut payload =
70            Vec::with_capacity(field.len().saturating_add(1).saturating_add(value.len()));
71        payload.extend_from_slice(field.as_bytes());
72        payload.push(b'=');
73        payload.extend_from_slice(value);
74
75        self.global_terms.push(MatchTerm::Exact {
76            field: field.to_string(),
77            value: value.to_vec(),
78            payload,
79        });
80        self
81    }
82
83    pub fn match_present(&mut self, field: &str) -> &mut Self {
84        if self.invalid_reason.is_some() {
85            return self;
86        }
87        if let Err(e) = validate_field_name(field, &self.journal.inner.config) {
88            self.invalid_reason = Some(e.to_string());
89            return self;
90        }
91        if self.count_terms() >= self.journal.inner.config.max_query_terms {
92            self.too_many_terms = true;
93            return self;
94        }
95
96        self.global_terms.push(MatchTerm::Present {
97            field: field.to_string(),
98        });
99        self
100    }
101
102    pub fn or_group<F>(&mut self, f: F) -> &mut Self
103    where
104        F: FnOnce(&mut OrGroupBuilder),
105    {
106        if self.invalid_reason.is_some() {
107            return self;
108        }
109        let remaining = self
110            .journal
111            .inner
112            .config
113            .max_query_terms
114            .saturating_sub(self.count_terms());
115        let mut b = OrGroupBuilder {
116            terms: Vec::new(),
117            config: self.journal.inner.config.clone(),
118            invalid_reason: None,
119            too_many_terms: false,
120            remaining,
121        };
122        f(&mut b);
123        if let Some(r) = b.invalid_reason {
124            self.invalid_reason = Some(r);
125            return self;
126        }
127        if b.too_many_terms {
128            self.too_many_terms = true;
129            return self;
130        }
131        if !b.terms.is_empty() {
132            self.or_groups.push(b.terms);
133        }
134        self
135    }
136
137    pub fn since_realtime(&mut self, usec: u64) -> &mut Self {
138        self.since_realtime = Some(usec);
139        self
140    }
141
142    pub fn until_realtime(&mut self, usec: u64) -> &mut Self {
143        self.until_realtime = Some(usec);
144        self
145    }
146
147    pub fn after_cursor(&mut self, cursor: Cursor) -> &mut Self {
148        self.cursor_start = Some((cursor, false));
149        self
150    }
151
152    /// Seek to the start of the journal (oldest entries).
153    ///
154    /// This clears any cursor-based starting position and disables `reverse`.
155    pub fn seek_head(&mut self) -> &mut Self {
156        self.cursor_start = None;
157        self.reverse = false;
158        self
159    }
160
161    /// Seek to the end of the journal (newest entries).
162    ///
163    /// This clears any cursor-based starting position and enables `reverse`.
164    pub fn seek_tail(&mut self) -> &mut Self {
165        self.cursor_start = None;
166        self.reverse = true;
167        self
168    }
169
170    pub fn reverse(&mut self, reverse: bool) -> &mut Self {
171        self.reverse = reverse;
172        self
173    }
174
175    pub fn limit(&mut self, n: usize) -> &mut Self {
176        self.limit = Some(n);
177        self
178    }
179
180    pub fn iter(&self) -> Result<impl Iterator<Item = Result<EntryRef>> + use<>> {
181        self.validate()?;
182        JournalIter::new(self.clone())
183    }
184
185    pub fn collect_owned(&self) -> Result<Vec<EntryOwned>> {
186        let mut out = Vec::new();
187        for item in self.iter()? {
188            let entry = item?;
189            out.push(entry.to_owned());
190        }
191        Ok(out)
192    }
193
194    pub fn follow(&self) -> Result<Follow> {
195        self.validate()?;
196        self.validate_follow()?;
197
198        let roots = self.journal.inner.roots.clone();
199        let config = self.journal.inner.config.clone();
200
201        let live_journal = Journal::open_dirs_with_config(&roots, config.clone())?;
202        let mut template = self.with_journal(live_journal.clone());
203        template.limit = None;
204
205        let mut catchup_query = self.with_journal(live_journal);
206        let mut last_cursor: Option<Cursor> = None;
207
208        let has_lower_bound = self.cursor_start.is_some() || self.since_realtime.is_some();
209        if !has_lower_bound {
210            let mut tail_probe = template.clone();
211            tail_probe.reverse(true);
212            tail_probe.limit(1);
213
214            for item in tail_probe.iter()? {
215                match item {
216                    Ok(entry) => {
217                        let c = entry.cursor()?;
218                        catchup_query.set_cursor_start(c.clone(), false)?;
219                        last_cursor = Some(c);
220                        break;
221                    }
222                    Err(_) => continue,
223                }
224            }
225        }
226
227        let catchup_iter: Box<dyn Iterator<Item = Result<EntryRef>> + Send> =
228            Box::new(catchup_query.iter()?);
229        Ok(Follow::new(
230            roots,
231            config,
232            template,
233            catchup_iter,
234            last_cursor,
235        ))
236    }
237
238    /// Create an async follow adapter for Tokio.
239    #[cfg(feature = "tokio")]
240    pub fn follow_tokio(&self) -> Result<crate::follow::TokioFollow> {
241        Ok(crate::follow::TokioFollow::spawn(self.follow()?))
242    }
243
244    pub(crate) fn set_cursor_start(&mut self, cursor: Cursor, inclusive: bool) -> Result<()> {
245        self.cursor_start = Some((cursor, inclusive));
246        Ok(())
247    }
248
249    pub(crate) fn with_journal(&self, journal: Journal) -> Self {
250        let mut q = self.clone();
251        q.journal = journal;
252        q
253    }
254
255    fn validate(&self) -> Result<()> {
256        if let Some(reason) = &self.invalid_reason {
257            return Err(SdJournalError::InvalidQuery {
258                reason: reason.clone(),
259            });
260        }
261        if self.too_many_terms {
262            return Err(SdJournalError::LimitExceeded {
263                kind: LimitKind::QueryTerms,
264                limit: u64::try_from(self.journal.inner.config.max_query_terms).unwrap_or(u64::MAX),
265            });
266        }
267
268        if let (Some(since), Some(until)) = (self.since_realtime, self.until_realtime)
269            && since > until
270        {
271            return Err(SdJournalError::InvalidQuery {
272                reason: "since_realtime must be <= until_realtime".to_string(),
273            });
274        }
275
276        Ok(())
277    }
278
279    fn validate_follow(&self) -> Result<()> {
280        if self.reverse {
281            return Err(SdJournalError::InvalidQuery {
282                reason: "follow() requires reverse=false".to_string(),
283            });
284        }
285        if self.until_realtime.is_some() {
286            return Err(SdJournalError::InvalidQuery {
287                reason: "follow() does not allow until_realtime".to_string(),
288            });
289        }
290        Ok(())
291    }
292
293    fn count_terms(&self) -> usize {
294        let mut n = self.global_terms.len();
295        for g in &self.or_groups {
296            n = n.saturating_add(g.len());
297        }
298        n
299    }
300}
301
302pub struct OrGroupBuilder {
303    terms: Vec<MatchTerm>,
304    config: crate::config::JournalConfig,
305    invalid_reason: Option<String>,
306    too_many_terms: bool,
307    remaining: usize,
308}
309
310impl OrGroupBuilder {
311    pub fn match_exact(&mut self, field: &str, value: &[u8]) -> &mut Self {
312        if self.invalid_reason.is_some() {
313            return self;
314        }
315        if let Err(e) = validate_field_name(field, &self.config) {
316            self.invalid_reason = Some(e.to_string());
317            return self;
318        }
319        if self.terms.len() >= self.remaining {
320            self.too_many_terms = true;
321            return self;
322        }
323        let mut payload =
324            Vec::with_capacity(field.len().saturating_add(1).saturating_add(value.len()));
325        payload.extend_from_slice(field.as_bytes());
326        payload.push(b'=');
327        payload.extend_from_slice(value);
328
329        self.terms.push(MatchTerm::Exact {
330            field: field.to_string(),
331            value: value.to_vec(),
332            payload,
333        });
334        self
335    }
336
337    pub fn match_present(&mut self, field: &str) -> &mut Self {
338        if self.invalid_reason.is_some() {
339            return self;
340        }
341        if let Err(e) = validate_field_name(field, &self.config) {
342            self.invalid_reason = Some(e.to_string());
343            return self;
344        }
345        if self.terms.len() >= self.remaining {
346            self.too_many_terms = true;
347            return self;
348        }
349        self.terms.push(MatchTerm::Present {
350            field: field.to_string(),
351        });
352        self
353    }
354}
355
356fn validate_field_name(field: &str, config: &crate::config::JournalConfig) -> Result<()> {
357    if field.len() > config.max_field_name_len {
358        return Err(SdJournalError::InvalidQuery {
359            reason: "field name too long".to_string(),
360        });
361    }
362    if !is_ascii_field_name(field.as_bytes()) {
363        return Err(SdJournalError::InvalidQuery {
364            reason: "field name must be ASCII and must not contain '='".to_string(),
365        });
366    }
367    Ok(())
368}
369
370fn build_branches(query: &JournalQuery) -> Vec<Vec<MatchTerm>> {
371    if query.or_groups.is_empty() {
372        return vec![query.global_terms.clone()];
373    }
374
375    let mut out = Vec::with_capacity(query.or_groups.len());
376    for group in &query.or_groups {
377        let mut terms = query.global_terms.clone();
378        terms.extend_from_slice(group);
379        out.push(terms);
380    }
381    out
382}
383
384enum FileMetaIter {
385    Empty,
386    Single(FileBranchIter),
387    Or(FileOrIter),
388}
389
390impl FileMetaIter {
391    fn from_branch_iters(mut iters: Vec<FileBranchIter>, reverse: bool) -> Self {
392        iters.retain(|it| !matches!(&it.kind, BranchKind::Empty));
393        match iters.len() {
394            0 => FileMetaIter::Empty,
395            1 => FileMetaIter::Single(iters.remove(0)),
396            _ => FileMetaIter::Or(FileOrIter::new(iters, reverse)),
397        }
398    }
399}
400
401impl Iterator for FileMetaIter {
402    type Item = Result<EntryMeta>;
403
404    fn next(&mut self) -> Option<Self::Item> {
405        match self {
406            FileMetaIter::Empty => None,
407            FileMetaIter::Single(it) => it.next(),
408            FileMetaIter::Or(it) => it.next(),
409        }
410    }
411}
412
413struct FileOrIter {
414    reverse: bool,
415    forward_heap: BinaryHeap<Reverse<FileOrHeapItem>>,
416    reverse_heap: BinaryHeap<FileOrHeapItem>,
417    iters: Vec<FileBranchIter>,
418    pending_errors: Vec<SdJournalError>,
419    done: bool,
420}
421
422#[derive(Clone, Copy)]
423struct FileOrHeapItem {
424    meta: EntryMeta,
425    branch_idx: usize,
426}
427
428impl PartialEq for FileOrHeapItem {
429    fn eq(&self, other: &Self) -> bool {
430        self.meta == other.meta && self.branch_idx == other.branch_idx
431    }
432}
433
434impl Eq for FileOrHeapItem {}
435
436impl PartialOrd for FileOrHeapItem {
437    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
438        Some(self.cmp(other))
439    }
440}
441
442impl Ord for FileOrHeapItem {
443    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
444        self.meta.cmp_key(&other.meta)
445    }
446}
447
448impl FileOrIter {
449    fn new(mut iters: Vec<FileBranchIter>, reverse: bool) -> Self {
450        let mut pending_errors = Vec::new();
451        let mut forward_heap: BinaryHeap<Reverse<FileOrHeapItem>> = BinaryHeap::new();
452        let mut reverse_heap: BinaryHeap<FileOrHeapItem> = BinaryHeap::new();
453
454        for (idx, it) in iters.iter_mut().enumerate() {
455            if let Some(meta) = next_ok_meta(it, &mut pending_errors) {
456                let item = FileOrHeapItem {
457                    meta,
458                    branch_idx: idx,
459                };
460                if reverse {
461                    reverse_heap.push(item);
462                } else {
463                    forward_heap.push(Reverse(item));
464                }
465            }
466        }
467
468        Self {
469            reverse,
470            forward_heap,
471            reverse_heap,
472            iters,
473            pending_errors,
474            done: false,
475        }
476    }
477
478    fn pop_next(&mut self) -> Option<FileOrHeapItem> {
479        if self.reverse {
480            self.reverse_heap.pop()
481        } else {
482            self.forward_heap.pop().map(|r| r.0)
483        }
484    }
485
486    fn push_next(&mut self, item: FileOrHeapItem) {
487        if self.reverse {
488            self.reverse_heap.push(item);
489        } else {
490            self.forward_heap.push(Reverse(item));
491        }
492    }
493}
494
495impl Iterator for FileOrIter {
496    type Item = Result<EntryMeta>;
497
498    fn next(&mut self) -> Option<Self::Item> {
499        if self.done {
500            return None;
501        }
502
503        if let Some(err) = self.pending_errors.pop() {
504            return Some(Err(err));
505        }
506
507        let item = match self.pop_next() {
508            Some(item) => item,
509            None => {
510                self.done = true;
511                return None;
512            }
513        };
514
515        if let Some(next_meta) =
516            next_ok_meta(&mut self.iters[item.branch_idx], &mut self.pending_errors)
517        {
518            self.push_next(FileOrHeapItem {
519                meta: next_meta,
520                branch_idx: item.branch_idx,
521            });
522        }
523
524        Some(Ok(item.meta))
525    }
526}
527
528struct AndOffsetIter {
529    reverse: bool,
530    iters: Vec<DataEntryOffsetIter>,
531    cursors: Vec<Option<u64>>,
532    initialized: bool,
533    pending_error: Option<SdJournalError>,
534    done: bool,
535}
536
537impl AndOffsetIter {
538    fn new(iters: Vec<DataEntryOffsetIter>, reverse: bool) -> Self {
539        let cursors = vec![None; iters.len()];
540        Self {
541            reverse,
542            iters,
543            cursors,
544            initialized: false,
545            pending_error: None,
546            done: false,
547        }
548    }
549
550    fn init(&mut self) -> Option<Result<()>> {
551        if self.initialized {
552            return Some(Ok(()));
553        }
554        for i in 0..self.iters.len() {
555            match self.iters[i].next() {
556                Some(Ok(v)) => self.cursors[i] = Some(v),
557                Some(Err(e)) => return Some(Err(e)),
558                None => return None,
559            }
560        }
561        self.initialized = true;
562        Some(Ok(()))
563    }
564
565    fn target(&self) -> Option<u64> {
566        let mut it = self.cursors.iter().copied();
567        let mut target = it.next()??;
568        for v in it {
569            let v = v?;
570            target = if self.reverse {
571                target.min(v)
572            } else {
573                target.max(v)
574            };
575        }
576        Some(target)
577    }
578
579    fn advance_to(&mut self, idx: usize, target: u64) -> Option<Result<()>> {
580        loop {
581            let cur = self.cursors.get(idx).copied().flatten()?;
582
583            let needs_advance = if self.reverse {
584                cur > target
585            } else {
586                cur < target
587            };
588            if !needs_advance {
589                return Some(Ok(()));
590            }
591
592            match self.iters[idx].next() {
593                Some(Ok(v)) => self.cursors[idx] = Some(v),
594                Some(Err(e)) => return Some(Err(e)),
595                None => {
596                    self.cursors[idx] = None;
597                    return None;
598                }
599            }
600        }
601    }
602}
603
604impl Iterator for AndOffsetIter {
605    type Item = Result<u64>;
606
607    fn next(&mut self) -> Option<Self::Item> {
608        if self.done {
609            return None;
610        }
611
612        if let Some(err) = self.pending_error.take() {
613            self.done = true;
614            return Some(Err(err));
615        }
616
617        match self.init() {
618            Some(Ok(())) => {}
619            Some(Err(e)) => {
620                self.done = true;
621                return Some(Err(e));
622            }
623            None => {
624                self.done = true;
625                return None;
626            }
627        }
628
629        loop {
630            let target = match self.target() {
631                Some(v) => v,
632                None => {
633                    self.done = true;
634                    return None;
635                }
636            };
637
638            for i in 0..self.iters.len() {
639                match self.advance_to(i, target) {
640                    Some(Ok(())) => {}
641                    Some(Err(e)) => {
642                        self.done = true;
643                        return Some(Err(e));
644                    }
645                    None => {
646                        self.done = true;
647                        return None;
648                    }
649                }
650            }
651
652            let first = self.cursors.first().copied().flatten();
653            let all_equal =
654                first.is_some() && self.cursors.iter().all(|v| v.is_some() && *v == first);
655
656            if !all_equal {
657                continue;
658            }
659
660            let out = first.unwrap_or(0);
661            for i in 0..self.iters.len() {
662                match self.iters[i].next() {
663                    Some(Ok(v)) => self.cursors[i] = Some(v),
664                    Some(Err(e)) => {
665                        self.cursors[i] = None;
666                        self.pending_error = Some(e);
667                    }
668                    None => self.cursors[i] = None,
669                }
670            }
671
672            return Some(Ok(out));
673        }
674    }
675}
676
677struct FileBranchIter {
678    file: crate::file::JournalFile,
679    kind: BranchKind,
680}
681
682enum BranchKind {
683    Empty,
684    Indexed {
685        offset_iter: AndOffsetIter,
686        present_fields: Vec<String>,
687    },
688    Scan {
689        iter: FileEntryIter,
690        terms: Vec<MatchTerm>,
691    },
692}
693
694impl FileBranchIter {
695    fn new(
696        file: crate::file::JournalFile,
697        terms: Vec<MatchTerm>,
698        reverse: bool,
699        since_realtime: Option<u64>,
700        until_realtime: Option<u64>,
701    ) -> Result<Self> {
702        let mut present_fields = Vec::new();
703        let mut exact_terms = Vec::new();
704
705        for t in &terms {
706            match t {
707                MatchTerm::Exact { .. } => exact_terms.push(t),
708                MatchTerm::Present { field } => present_fields.push(field.clone()),
709            }
710        }
711
712        if exact_terms.is_empty() {
713            let iter = file.entry_iter_seek_realtime(reverse, since_realtime, until_realtime)?;
714            return Ok(Self {
715                file,
716                kind: BranchKind::Scan { iter, terms },
717            });
718        }
719
720        let mut data_refs: Vec<DataObjectRef> = Vec::new();
721        for t in &exact_terms {
722            let payload = match t {
723                MatchTerm::Exact { payload, .. } => payload.as_slice(),
724                _ => continue,
725            };
726
727            match file.find_data_object(payload) {
728                Ok(Some(d)) => data_refs.push(d),
729                Ok(None) => {
730                    return Ok(Self {
731                        file,
732                        kind: BranchKind::Empty,
733                    });
734                }
735                Err(_) => {
736                    let iter =
737                        file.entry_iter_seek_realtime(reverse, since_realtime, until_realtime)?;
738                    return Ok(Self {
739                        file,
740                        kind: BranchKind::Scan { iter, terms },
741                    });
742                }
743            }
744        }
745
746        data_refs.sort_by_key(|d| d.n_entries);
747
748        let mut iters = Vec::with_capacity(data_refs.len());
749        for d in data_refs {
750            match file.data_entry_offsets(d, reverse) {
751                Ok(it) => iters.push(it),
752                Err(_) => {
753                    let iter =
754                        file.entry_iter_seek_realtime(reverse, since_realtime, until_realtime)?;
755                    return Ok(Self {
756                        file,
757                        kind: BranchKind::Scan { iter, terms },
758                    });
759                }
760            }
761        }
762
763        Ok(Self {
764            file,
765            kind: BranchKind::Indexed {
766                offset_iter: AndOffsetIter::new(iters, reverse),
767                present_fields,
768            },
769        })
770    }
771}
772
773impl Iterator for FileBranchIter {
774    type Item = Result<EntryMeta>;
775
776    fn next(&mut self) -> Option<Self::Item> {
777        loop {
778            match &mut self.kind {
779                BranchKind::Empty => return None,
780                BranchKind::Indexed {
781                    offset_iter,
782                    present_fields,
783                } => {
784                    let entry_offset = match offset_iter.next()? {
785                        Ok(v) => v,
786                        Err(e) => {
787                            self.kind = BranchKind::Empty;
788                            return Some(Err(e));
789                        }
790                    };
791
792                    let meta = match self.file.read_entry_meta(entry_offset) {
793                        Ok(m) => m,
794                        Err(e) => return Some(Err(e)),
795                    };
796
797                    if present_fields.is_empty() {
798                        return Some(Ok(meta));
799                    }
800
801                    let owned = match self.file.read_entry_owned(entry_offset) {
802                        Ok(e) => e,
803                        Err(e) => return Some(Err(e)),
804                    };
805
806                    if present_fields.iter().all(|f| owned.get(f).is_some()) {
807                        return Some(Ok(meta));
808                    }
809
810                    continue;
811                }
812                BranchKind::Scan { iter, terms } => match iter.next()? {
813                    Ok(meta) => {
814                        if terms.is_empty() {
815                            return Some(Ok(meta));
816                        }
817
818                        let owned = match self.file.read_entry_owned(meta.entry_offset) {
819                            Ok(e) => e,
820                            Err(e) => return Some(Err(e)),
821                        };
822
823                        if terms.iter().all(|t| term_matches(&owned, t)) {
824                            return Some(Ok(meta));
825                        }
826
827                        continue;
828                    }
829                    Err(e) => return Some(Err(e)),
830                },
831            }
832        }
833    }
834}
835
836struct JournalIter {
837    query: JournalQuery,
838    cursor_key: Option<(EntryMeta, bool)>, // (cursor meta, inclusive)
839    produced: usize,
840    last_emitted: Option<EntryMeta>,
841    forward_heap: BinaryHeap<Reverse<HeapItem>>,
842    reverse_heap: BinaryHeap<HeapItem>,
843    iters: Vec<FileMetaIter>,
844    pending_errors: Vec<SdJournalError>,
845    done: bool,
846}
847
848#[derive(Clone, Copy)]
849struct HeapItem {
850    meta: EntryMeta,
851    file_idx: usize,
852}
853
854impl PartialEq for HeapItem {
855    fn eq(&self, other: &Self) -> bool {
856        self.meta == other.meta && self.file_idx == other.file_idx
857    }
858}
859
860impl Eq for HeapItem {}
861
862impl PartialOrd for HeapItem {
863    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
864        Some(self.cmp(other))
865    }
866}
867
868impl Ord for HeapItem {
869    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
870        self.meta
871            .cmp_key(&other.meta)
872            .then_with(|| self.file_idx.cmp(&other.file_idx))
873    }
874}
875
876impl JournalIter {
877    fn new(query: JournalQuery) -> Result<Self> {
878        if matches!(query.limit, Some(0)) {
879            return Ok(Self {
880                query,
881                cursor_key: None,
882                produced: 0,
883                last_emitted: None,
884                forward_heap: BinaryHeap::new(),
885                reverse_heap: BinaryHeap::new(),
886                iters: Vec::new(),
887                pending_errors: Vec::new(),
888                done: true,
889            });
890        }
891
892        let mut pending_errors = Vec::new();
893        let cursor_key = build_cursor_key(&query)?;
894        let branches = build_branches(&query);
895
896        let mut iters = Vec::with_capacity(query.journal.inner.files.len());
897        for f in &query.journal.inner.files {
898            let mut branch_iters = Vec::with_capacity(branches.len());
899            for terms in &branches {
900                match FileBranchIter::new(
901                    f.clone(),
902                    terms.clone(),
903                    query.reverse,
904                    query.since_realtime,
905                    query.until_realtime,
906                ) {
907                    Ok(it) => branch_iters.push(it),
908                    Err(e) => pending_errors.push(e),
909                }
910            }
911            iters.push(FileMetaIter::from_branch_iters(branch_iters, query.reverse));
912        }
913
914        let mut forward_heap: BinaryHeap<Reverse<HeapItem>> = BinaryHeap::new();
915        let mut reverse_heap: BinaryHeap<HeapItem> = BinaryHeap::new();
916
917        for (idx, it) in iters.iter_mut().enumerate() {
918            if let Some(meta) = next_ok_meta(it, &mut pending_errors) {
919                let item = HeapItem {
920                    meta,
921                    file_idx: idx,
922                };
923                if query.reverse {
924                    reverse_heap.push(item);
925                } else {
926                    forward_heap.push(Reverse(item));
927                }
928            }
929        }
930
931        Ok(Self {
932            query,
933            cursor_key,
934            produced: 0,
935            last_emitted: None,
936            forward_heap,
937            reverse_heap,
938            iters,
939            pending_errors,
940            done: false,
941        })
942    }
943
944    fn pop_next(&mut self) -> Option<HeapItem> {
945        if self.query.reverse {
946            self.reverse_heap.pop()
947        } else {
948            self.forward_heap.pop().map(|r| r.0)
949        }
950    }
951
952    fn push_next(&mut self, item: HeapItem) {
953        if self.query.reverse {
954            self.reverse_heap.push(item);
955        } else {
956            self.forward_heap.push(Reverse(item));
957        }
958    }
959
960    fn passes_filters(&self, meta: &EntryMeta) -> bool {
961        if let Some((cursor_meta, inclusive)) = &self.cursor_key {
962            let ord = meta.cmp_key(cursor_meta);
963            if *inclusive {
964                if ord == std::cmp::Ordering::Less {
965                    return false;
966                }
967            } else if ord != std::cmp::Ordering::Greater {
968                return false;
969            }
970        }
971
972        if let Some(since) = self.query.since_realtime
973            && meta.realtime_usec < since
974        {
975            return false;
976        }
977        if let Some(until) = self.query.until_realtime
978            && meta.realtime_usec > until
979        {
980            return false;
981        }
982
983        true
984    }
985}
986
987impl Iterator for JournalIter {
988    type Item = Result<EntryRef>;
989
990    fn next(&mut self) -> Option<Self::Item> {
991        if self.done {
992            return None;
993        }
994
995        if let Some(err) = self.pending_errors.pop() {
996            return Some(Err(err));
997        }
998
999        if let Some(limit) = self.query.limit
1000            && (limit == 0 || self.produced >= limit)
1001        {
1002            self.done = true;
1003            return None;
1004        }
1005
1006        loop {
1007            let item = match self.pop_next() {
1008                Some(item) => item,
1009                None => {
1010                    self.done = true;
1011                    return None;
1012                }
1013            };
1014
1015            if let Some(next_meta) =
1016                next_ok_meta(&mut self.iters[item.file_idx], &mut self.pending_errors)
1017            {
1018                self.push_next(HeapItem {
1019                    meta: next_meta,
1020                    file_idx: item.file_idx,
1021                });
1022            }
1023
1024            if !self.passes_filters(&item.meta) {
1025                continue;
1026            }
1027
1028            if self.last_emitted == Some(item.meta) {
1029                continue;
1030            }
1031
1032            let file = &self.query.journal.inner.files[item.file_idx];
1033            let entry = match file.read_entry_ref(item.meta.entry_offset) {
1034                Ok(e) => e,
1035                Err(e) => {
1036                    self.pending_errors.push(e);
1037                    if let Some(err) = self.pending_errors.pop() {
1038                        return Some(Err(err));
1039                    }
1040                    continue;
1041                }
1042            };
1043
1044            self.last_emitted = Some(item.meta);
1045            self.produced = self.produced.saturating_add(1);
1046            return Some(Ok(entry));
1047        }
1048    }
1049}
1050
1051fn next_ok_meta<I>(it: &mut I, pending: &mut Vec<SdJournalError>) -> Option<EntryMeta>
1052where
1053    I: Iterator<Item = Result<EntryMeta>>,
1054{
1055    for item in it.by_ref() {
1056        match item {
1057            Ok(m) => return Some(m),
1058            Err(e) => pending.push(e),
1059        }
1060    }
1061    None
1062}
1063
1064fn build_cursor_key(query: &JournalQuery) -> Result<Option<(EntryMeta, bool)>> {
1065    let (cursor, inclusive) = match &query.cursor_start {
1066        Some(v) => v,
1067        None => return Ok(None),
1068    };
1069
1070    if let Some(k) = cursor.sdjournal_entry_key() {
1071        return Ok(Some((
1072            EntryMeta {
1073                file_id: k.file_id,
1074                entry_offset: k.entry_offset,
1075                seqnum: k.seqnum,
1076                realtime_usec: k.realtime_usec,
1077            },
1078            *inclusive,
1079        )));
1080    }
1081
1082    if let Some((file_id, entry_offset)) = cursor.file_offset() {
1083        let file = query
1084            .journal
1085            .inner
1086            .files
1087            .iter()
1088            .find(|f| f.file_id() == file_id)
1089            .ok_or(SdJournalError::NotFound)?;
1090
1091        let meta = file
1092            .read_entry_meta(entry_offset)
1093            .map_err(|_| SdJournalError::NotFound)?;
1094        return Ok(Some((meta, *inclusive)));
1095    }
1096
1097    if let Some(sys) = cursor.systemd() {
1098        let meta = resolve_systemd_cursor_key(query, sys)?;
1099        return Ok(Some((meta, *inclusive)));
1100    }
1101
1102    Err(SdJournalError::InvalidQuery {
1103        reason: "unsupported cursor format".to_string(),
1104    })
1105}
1106
1107fn resolve_systemd_cursor_key(
1108    query: &JournalQuery,
1109    sys: &crate::cursor::SystemdCursor,
1110) -> Result<EntryMeta> {
1111    match find_exact_systemd_cursor(query, sys) {
1112        Ok(Some(meta)) => return Ok(meta),
1113        Ok(None) => {}
1114        Err(e) => {
1115            if sys.realtime_usec.is_none() {
1116                return Err(e);
1117            }
1118        }
1119    }
1120
1121    let realtime_usec = sys.realtime_usec.ok_or(SdJournalError::NotFound)?;
1122    Ok(EntryMeta {
1123        file_id: [0u8; 16],
1124        entry_offset: 0,
1125        seqnum: sys.seqnum.unwrap_or(0),
1126        realtime_usec,
1127    })
1128}
1129
1130fn find_exact_systemd_cursor(
1131    query: &JournalQuery,
1132    sys: &crate::cursor::SystemdCursor,
1133) -> Result<Option<EntryMeta>> {
1134    let mut candidates: Vec<&crate::file::JournalFile> = Vec::new();
1135    if let Some(seqnum_id) = sys.seqnum_id {
1136        for f in &query.journal.inner.files {
1137            if f.seqnum_id() == seqnum_id {
1138                candidates.push(f);
1139            }
1140        }
1141        if candidates.is_empty() {
1142            candidates.extend(query.journal.inner.files.iter());
1143        }
1144    } else {
1145        candidates.extend(query.journal.inner.files.iter());
1146    }
1147
1148    let mut first_error: Option<SdJournalError> = None;
1149
1150    for file in candidates {
1151        match find_exact_systemd_cursor_in_file(file, sys) {
1152            Ok(Some(meta)) => return Ok(Some(meta)),
1153            Ok(None) => {}
1154            Err(e) => {
1155                if first_error.is_none() {
1156                    first_error = Some(e);
1157                }
1158            }
1159        }
1160    }
1161
1162    match first_error {
1163        Some(e) => Err(e),
1164        None => Ok(None),
1165    }
1166}
1167
1168fn find_exact_systemd_cursor_in_file(
1169    file: &crate::file::JournalFile,
1170    sys: &crate::cursor::SystemdCursor,
1171) -> Result<Option<EntryMeta>> {
1172    if let Some(seqnum_id) = sys.seqnum_id
1173        && file.seqnum_id() != seqnum_id
1174    {
1175        return Ok(None);
1176    }
1177
1178    let iter = file.entry_iter_seek_realtime(false, sys.realtime_usec, None)?;
1179    for item in iter {
1180        let meta = item?;
1181
1182        if let Some(want_realtime) = sys.realtime_usec
1183            && meta.realtime_usec != want_realtime
1184        {
1185            continue;
1186        }
1187        if let Some(want_seqnum) = sys.seqnum
1188            && meta.seqnum != want_seqnum
1189        {
1190            continue;
1191        }
1192
1193        let fields = file.read_entry_cursor_fields(meta.entry_offset)?;
1194
1195        if let Some(want_realtime) = sys.realtime_usec
1196            && fields.realtime_usec != want_realtime
1197        {
1198            continue;
1199        }
1200        if let Some(want_seqnum) = sys.seqnum
1201            && fields.seqnum != want_seqnum
1202        {
1203            continue;
1204        }
1205        if let Some(want_boot_id) = sys.boot_id
1206            && fields.boot_id != want_boot_id
1207        {
1208            continue;
1209        }
1210        if let Some(want_monotonic) = sys.monotonic_usec
1211            && fields.monotonic_usec != want_monotonic
1212        {
1213            continue;
1214        }
1215        if let Some(want_xor) = sys.xor_hash
1216            && fields.xor_hash != want_xor
1217        {
1218            continue;
1219        }
1220
1221        return Ok(Some(meta));
1222    }
1223
1224    Ok(None)
1225}
1226
1227fn term_matches(entry: &EntryOwned, term: &MatchTerm) -> bool {
1228    match term {
1229        MatchTerm::Exact { field, value, .. } => entry
1230            .iter_fields()
1231            .any(|(k, v)| k == field.as_str() && v == value.as_slice()),
1232        MatchTerm::Present { field } => entry.get(field).is_some(),
1233    }
1234}
1235
1236#[cfg(test)]
1237mod tests {
1238    use super::*;
1239    use crate::JournalConfig;
1240    use crate::journal::JournalInner;
1241    use std::sync::Arc;
1242
1243    fn empty_journal_with_config(config: JournalConfig) -> Journal {
1244        Journal {
1245            inner: Arc::new(JournalInner {
1246                config,
1247                roots: Vec::new(),
1248                files: Vec::new(),
1249            }),
1250        }
1251    }
1252
1253    #[test]
1254    fn invalid_field_name_rejected_on_iter() {
1255        let journal = empty_journal_with_config(JournalConfig::default());
1256        let mut q = JournalQuery::new(journal);
1257        q.match_exact("BAD=FIELD", b"x");
1258        match q.iter() {
1259            Ok(_) => panic!("expected InvalidQuery"),
1260            Err(err) => assert!(matches!(err, SdJournalError::InvalidQuery { .. })),
1261        }
1262    }
1263
1264    #[test]
1265    fn too_many_terms_rejected_on_iter() {
1266        let cfg = JournalConfig {
1267            max_query_terms: 1,
1268            ..Default::default()
1269        };
1270        let journal = empty_journal_with_config(cfg);
1271        let mut q = JournalQuery::new(journal);
1272        q.match_present("A");
1273        q.match_present("B");
1274        match q.iter() {
1275            Ok(_) => panic!("expected QueryTerms limit error"),
1276            Err(err) => assert!(matches!(
1277                err,
1278                SdJournalError::LimitExceeded {
1279                    kind: LimitKind::QueryTerms,
1280                    ..
1281                }
1282            )),
1283        }
1284    }
1285}