1use crate::cursor::Cursor;
2use crate::entry::{EntryOwned, EntryRef};
3use crate::error::{LimitKind, Result, SdJournalError};
4use crate::file::{DataEntryOffsetIter, DataObjectRef, EntryMeta, FileEntryIter};
5use crate::follow::Follow;
6use crate::journal::Journal;
7use crate::util::is_ascii_field_name;
8use std::cmp::Reverse;
9use std::collections::BinaryHeap;
10
11#[derive(Debug, Clone)]
12enum MatchTerm {
13 Exact {
14 field: String,
15 value: Vec<u8>,
16 payload: Vec<u8>,
17 },
18 Present {
19 field: String,
20 },
21}
22
23#[derive(Clone)]
25pub struct JournalQuery {
26 journal: Journal,
27
28 global_terms: Vec<MatchTerm>,
29 or_groups: Vec<Vec<MatchTerm>>,
30
31 since_realtime: Option<u64>,
32 until_realtime: Option<u64>,
33 cursor_start: Option<(Cursor, bool)>, reverse: bool,
35 limit: Option<usize>,
36 invalid_reason: Option<String>,
37 too_many_terms: bool,
38}
39
40impl JournalQuery {
41 pub(crate) fn new(journal: Journal) -> Self {
42 Self {
43 journal,
44 global_terms: Vec::new(),
45 or_groups: Vec::new(),
46 since_realtime: None,
47 until_realtime: None,
48 cursor_start: None,
49 reverse: false,
50 limit: None,
51 invalid_reason: None,
52 too_many_terms: false,
53 }
54 }
55
56 pub fn match_exact(&mut self, field: &str, value: &[u8]) -> &mut Self {
57 if self.invalid_reason.is_some() {
58 return self;
59 }
60 if let Err(e) = validate_field_name(field, &self.journal.inner.config) {
61 self.invalid_reason = Some(e.to_string());
62 return self;
63 }
64 if self.count_terms() >= self.journal.inner.config.max_query_terms {
65 self.too_many_terms = true;
66 return self;
67 }
68
69 let mut payload =
70 Vec::with_capacity(field.len().saturating_add(1).saturating_add(value.len()));
71 payload.extend_from_slice(field.as_bytes());
72 payload.push(b'=');
73 payload.extend_from_slice(value);
74
75 self.global_terms.push(MatchTerm::Exact {
76 field: field.to_string(),
77 value: value.to_vec(),
78 payload,
79 });
80 self
81 }
82
83 pub fn match_present(&mut self, field: &str) -> &mut Self {
84 if self.invalid_reason.is_some() {
85 return self;
86 }
87 if let Err(e) = validate_field_name(field, &self.journal.inner.config) {
88 self.invalid_reason = Some(e.to_string());
89 return self;
90 }
91 if self.count_terms() >= self.journal.inner.config.max_query_terms {
92 self.too_many_terms = true;
93 return self;
94 }
95
96 self.global_terms.push(MatchTerm::Present {
97 field: field.to_string(),
98 });
99 self
100 }
101
102 pub fn or_group<F>(&mut self, f: F) -> &mut Self
103 where
104 F: FnOnce(&mut OrGroupBuilder),
105 {
106 if self.invalid_reason.is_some() {
107 return self;
108 }
109 let remaining = self
110 .journal
111 .inner
112 .config
113 .max_query_terms
114 .saturating_sub(self.count_terms());
115 let mut b = OrGroupBuilder {
116 terms: Vec::new(),
117 config: self.journal.inner.config.clone(),
118 invalid_reason: None,
119 too_many_terms: false,
120 remaining,
121 };
122 f(&mut b);
123 if let Some(r) = b.invalid_reason {
124 self.invalid_reason = Some(r);
125 return self;
126 }
127 if b.too_many_terms {
128 self.too_many_terms = true;
129 return self;
130 }
131 if !b.terms.is_empty() {
132 self.or_groups.push(b.terms);
133 }
134 self
135 }
136
137 pub fn since_realtime(&mut self, usec: u64) -> &mut Self {
138 self.since_realtime = Some(usec);
139 self
140 }
141
142 pub fn until_realtime(&mut self, usec: u64) -> &mut Self {
143 self.until_realtime = Some(usec);
144 self
145 }
146
147 pub fn after_cursor(&mut self, cursor: Cursor) -> &mut Self {
148 self.cursor_start = Some((cursor, false));
149 self
150 }
151
152 pub fn seek_head(&mut self) -> &mut Self {
156 self.cursor_start = None;
157 self.reverse = false;
158 self
159 }
160
161 pub fn seek_tail(&mut self) -> &mut Self {
165 self.cursor_start = None;
166 self.reverse = true;
167 self
168 }
169
170 pub fn reverse(&mut self, reverse: bool) -> &mut Self {
171 self.reverse = reverse;
172 self
173 }
174
175 pub fn limit(&mut self, n: usize) -> &mut Self {
176 self.limit = Some(n);
177 self
178 }
179
180 pub fn iter(&self) -> Result<impl Iterator<Item = Result<EntryRef>> + use<>> {
181 self.validate()?;
182 JournalIter::new(self.clone())
183 }
184
185 pub fn collect_owned(&self) -> Result<Vec<EntryOwned>> {
186 let mut out = Vec::new();
187 for item in self.iter()? {
188 let entry = item?;
189 out.push(entry.to_owned());
190 }
191 Ok(out)
192 }
193
194 pub fn follow(&self) -> Result<Follow> {
195 self.validate()?;
196 self.validate_follow()?;
197
198 let roots = self.journal.inner.roots.clone();
199 let config = self.journal.inner.config.clone();
200
201 let live_journal = Journal::open_dirs_with_config(&roots, config.clone())?;
202 let mut template = self.with_journal(live_journal.clone());
203 template.limit = None;
204
205 let mut catchup_query = self.with_journal(live_journal);
206 let mut last_cursor: Option<Cursor> = None;
207
208 let has_lower_bound = self.cursor_start.is_some() || self.since_realtime.is_some();
209 if !has_lower_bound {
210 let mut tail_probe = template.clone();
211 tail_probe.reverse(true);
212 tail_probe.limit(1);
213
214 for item in tail_probe.iter()? {
215 match item {
216 Ok(entry) => {
217 let c = entry.cursor()?;
218 catchup_query.set_cursor_start(c.clone(), false)?;
219 last_cursor = Some(c);
220 break;
221 }
222 Err(_) => continue,
223 }
224 }
225 }
226
227 let catchup_iter: Box<dyn Iterator<Item = Result<EntryRef>> + Send> =
228 Box::new(catchup_query.iter()?);
229 Ok(Follow::new(
230 roots,
231 config,
232 template,
233 catchup_iter,
234 last_cursor,
235 ))
236 }
237
238 #[cfg(feature = "tokio")]
240 pub fn follow_tokio(&self) -> Result<crate::follow::TokioFollow> {
241 Ok(crate::follow::TokioFollow::spawn(self.follow()?))
242 }
243
244 pub(crate) fn set_cursor_start(&mut self, cursor: Cursor, inclusive: bool) -> Result<()> {
245 self.cursor_start = Some((cursor, inclusive));
246 Ok(())
247 }
248
249 pub(crate) fn with_journal(&self, journal: Journal) -> Self {
250 let mut q = self.clone();
251 q.journal = journal;
252 q
253 }
254
255 fn validate(&self) -> Result<()> {
256 if let Some(reason) = &self.invalid_reason {
257 return Err(SdJournalError::InvalidQuery {
258 reason: reason.clone(),
259 });
260 }
261 if self.too_many_terms {
262 return Err(SdJournalError::LimitExceeded {
263 kind: LimitKind::QueryTerms,
264 limit: u64::try_from(self.journal.inner.config.max_query_terms).unwrap_or(u64::MAX),
265 });
266 }
267
268 if let (Some(since), Some(until)) = (self.since_realtime, self.until_realtime)
269 && since > until
270 {
271 return Err(SdJournalError::InvalidQuery {
272 reason: "since_realtime must be <= until_realtime".to_string(),
273 });
274 }
275
276 Ok(())
277 }
278
279 fn validate_follow(&self) -> Result<()> {
280 if self.reverse {
281 return Err(SdJournalError::InvalidQuery {
282 reason: "follow() requires reverse=false".to_string(),
283 });
284 }
285 if self.until_realtime.is_some() {
286 return Err(SdJournalError::InvalidQuery {
287 reason: "follow() does not allow until_realtime".to_string(),
288 });
289 }
290 Ok(())
291 }
292
293 fn count_terms(&self) -> usize {
294 let mut n = self.global_terms.len();
295 for g in &self.or_groups {
296 n = n.saturating_add(g.len());
297 }
298 n
299 }
300}
301
302pub struct OrGroupBuilder {
303 terms: Vec<MatchTerm>,
304 config: crate::config::JournalConfig,
305 invalid_reason: Option<String>,
306 too_many_terms: bool,
307 remaining: usize,
308}
309
310impl OrGroupBuilder {
311 pub fn match_exact(&mut self, field: &str, value: &[u8]) -> &mut Self {
312 if self.invalid_reason.is_some() {
313 return self;
314 }
315 if let Err(e) = validate_field_name(field, &self.config) {
316 self.invalid_reason = Some(e.to_string());
317 return self;
318 }
319 if self.terms.len() >= self.remaining {
320 self.too_many_terms = true;
321 return self;
322 }
323 let mut payload =
324 Vec::with_capacity(field.len().saturating_add(1).saturating_add(value.len()));
325 payload.extend_from_slice(field.as_bytes());
326 payload.push(b'=');
327 payload.extend_from_slice(value);
328
329 self.terms.push(MatchTerm::Exact {
330 field: field.to_string(),
331 value: value.to_vec(),
332 payload,
333 });
334 self
335 }
336
337 pub fn match_present(&mut self, field: &str) -> &mut Self {
338 if self.invalid_reason.is_some() {
339 return self;
340 }
341 if let Err(e) = validate_field_name(field, &self.config) {
342 self.invalid_reason = Some(e.to_string());
343 return self;
344 }
345 if self.terms.len() >= self.remaining {
346 self.too_many_terms = true;
347 return self;
348 }
349 self.terms.push(MatchTerm::Present {
350 field: field.to_string(),
351 });
352 self
353 }
354}
355
356fn validate_field_name(field: &str, config: &crate::config::JournalConfig) -> Result<()> {
357 if field.len() > config.max_field_name_len {
358 return Err(SdJournalError::InvalidQuery {
359 reason: "field name too long".to_string(),
360 });
361 }
362 if !is_ascii_field_name(field.as_bytes()) {
363 return Err(SdJournalError::InvalidQuery {
364 reason: "field name must be ASCII and must not contain '='".to_string(),
365 });
366 }
367 Ok(())
368}
369
370fn build_branches(query: &JournalQuery) -> Vec<Vec<MatchTerm>> {
371 if query.or_groups.is_empty() {
372 return vec![query.global_terms.clone()];
373 }
374
375 let mut out = Vec::with_capacity(query.or_groups.len());
376 for group in &query.or_groups {
377 let mut terms = query.global_terms.clone();
378 terms.extend_from_slice(group);
379 out.push(terms);
380 }
381 out
382}
383
384enum FileMetaIter {
385 Empty,
386 Single(FileBranchIter),
387 Or(FileOrIter),
388}
389
390impl FileMetaIter {
391 fn from_branch_iters(mut iters: Vec<FileBranchIter>, reverse: bool) -> Self {
392 iters.retain(|it| !matches!(&it.kind, BranchKind::Empty));
393 match iters.len() {
394 0 => FileMetaIter::Empty,
395 1 => FileMetaIter::Single(iters.remove(0)),
396 _ => FileMetaIter::Or(FileOrIter::new(iters, reverse)),
397 }
398 }
399}
400
401impl Iterator for FileMetaIter {
402 type Item = Result<EntryMeta>;
403
404 fn next(&mut self) -> Option<Self::Item> {
405 match self {
406 FileMetaIter::Empty => None,
407 FileMetaIter::Single(it) => it.next(),
408 FileMetaIter::Or(it) => it.next(),
409 }
410 }
411}
412
413struct FileOrIter {
414 reverse: bool,
415 forward_heap: BinaryHeap<Reverse<FileOrHeapItem>>,
416 reverse_heap: BinaryHeap<FileOrHeapItem>,
417 iters: Vec<FileBranchIter>,
418 pending_errors: Vec<SdJournalError>,
419 done: bool,
420}
421
422#[derive(Clone, Copy)]
423struct FileOrHeapItem {
424 meta: EntryMeta,
425 branch_idx: usize,
426}
427
428impl PartialEq for FileOrHeapItem {
429 fn eq(&self, other: &Self) -> bool {
430 self.meta == other.meta && self.branch_idx == other.branch_idx
431 }
432}
433
434impl Eq for FileOrHeapItem {}
435
436impl PartialOrd for FileOrHeapItem {
437 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
438 Some(self.cmp(other))
439 }
440}
441
442impl Ord for FileOrHeapItem {
443 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
444 self.meta.cmp_key(&other.meta)
445 }
446}
447
448impl FileOrIter {
449 fn new(mut iters: Vec<FileBranchIter>, reverse: bool) -> Self {
450 let mut pending_errors = Vec::new();
451 let mut forward_heap: BinaryHeap<Reverse<FileOrHeapItem>> = BinaryHeap::new();
452 let mut reverse_heap: BinaryHeap<FileOrHeapItem> = BinaryHeap::new();
453
454 for (idx, it) in iters.iter_mut().enumerate() {
455 if let Some(meta) = next_ok_meta(it, &mut pending_errors) {
456 let item = FileOrHeapItem {
457 meta,
458 branch_idx: idx,
459 };
460 if reverse {
461 reverse_heap.push(item);
462 } else {
463 forward_heap.push(Reverse(item));
464 }
465 }
466 }
467
468 Self {
469 reverse,
470 forward_heap,
471 reverse_heap,
472 iters,
473 pending_errors,
474 done: false,
475 }
476 }
477
478 fn pop_next(&mut self) -> Option<FileOrHeapItem> {
479 if self.reverse {
480 self.reverse_heap.pop()
481 } else {
482 self.forward_heap.pop().map(|r| r.0)
483 }
484 }
485
486 fn push_next(&mut self, item: FileOrHeapItem) {
487 if self.reverse {
488 self.reverse_heap.push(item);
489 } else {
490 self.forward_heap.push(Reverse(item));
491 }
492 }
493}
494
495impl Iterator for FileOrIter {
496 type Item = Result<EntryMeta>;
497
498 fn next(&mut self) -> Option<Self::Item> {
499 if self.done {
500 return None;
501 }
502
503 if let Some(err) = self.pending_errors.pop() {
504 return Some(Err(err));
505 }
506
507 let item = match self.pop_next() {
508 Some(item) => item,
509 None => {
510 self.done = true;
511 return None;
512 }
513 };
514
515 if let Some(next_meta) =
516 next_ok_meta(&mut self.iters[item.branch_idx], &mut self.pending_errors)
517 {
518 self.push_next(FileOrHeapItem {
519 meta: next_meta,
520 branch_idx: item.branch_idx,
521 });
522 }
523
524 Some(Ok(item.meta))
525 }
526}
527
528struct AndOffsetIter {
529 reverse: bool,
530 iters: Vec<DataEntryOffsetIter>,
531 cursors: Vec<Option<u64>>,
532 initialized: bool,
533 pending_error: Option<SdJournalError>,
534 done: bool,
535}
536
537impl AndOffsetIter {
538 fn new(iters: Vec<DataEntryOffsetIter>, reverse: bool) -> Self {
539 let cursors = vec![None; iters.len()];
540 Self {
541 reverse,
542 iters,
543 cursors,
544 initialized: false,
545 pending_error: None,
546 done: false,
547 }
548 }
549
550 fn init(&mut self) -> Option<Result<()>> {
551 if self.initialized {
552 return Some(Ok(()));
553 }
554 for i in 0..self.iters.len() {
555 match self.iters[i].next() {
556 Some(Ok(v)) => self.cursors[i] = Some(v),
557 Some(Err(e)) => return Some(Err(e)),
558 None => return None,
559 }
560 }
561 self.initialized = true;
562 Some(Ok(()))
563 }
564
565 fn target(&self) -> Option<u64> {
566 let mut it = self.cursors.iter().copied();
567 let mut target = it.next()??;
568 for v in it {
569 let v = v?;
570 target = if self.reverse {
571 target.min(v)
572 } else {
573 target.max(v)
574 };
575 }
576 Some(target)
577 }
578
579 fn advance_to(&mut self, idx: usize, target: u64) -> Option<Result<()>> {
580 loop {
581 let cur = self.cursors.get(idx).copied().flatten()?;
582
583 let needs_advance = if self.reverse {
584 cur > target
585 } else {
586 cur < target
587 };
588 if !needs_advance {
589 return Some(Ok(()));
590 }
591
592 match self.iters[idx].next() {
593 Some(Ok(v)) => self.cursors[idx] = Some(v),
594 Some(Err(e)) => return Some(Err(e)),
595 None => {
596 self.cursors[idx] = None;
597 return None;
598 }
599 }
600 }
601 }
602}
603
604impl Iterator for AndOffsetIter {
605 type Item = Result<u64>;
606
607 fn next(&mut self) -> Option<Self::Item> {
608 if self.done {
609 return None;
610 }
611
612 if let Some(err) = self.pending_error.take() {
613 self.done = true;
614 return Some(Err(err));
615 }
616
617 match self.init() {
618 Some(Ok(())) => {}
619 Some(Err(e)) => {
620 self.done = true;
621 return Some(Err(e));
622 }
623 None => {
624 self.done = true;
625 return None;
626 }
627 }
628
629 loop {
630 let target = match self.target() {
631 Some(v) => v,
632 None => {
633 self.done = true;
634 return None;
635 }
636 };
637
638 for i in 0..self.iters.len() {
639 match self.advance_to(i, target) {
640 Some(Ok(())) => {}
641 Some(Err(e)) => {
642 self.done = true;
643 return Some(Err(e));
644 }
645 None => {
646 self.done = true;
647 return None;
648 }
649 }
650 }
651
652 let first = self.cursors.first().copied().flatten();
653 let all_equal =
654 first.is_some() && self.cursors.iter().all(|v| v.is_some() && *v == first);
655
656 if !all_equal {
657 continue;
658 }
659
660 let out = first.unwrap_or(0);
661 for i in 0..self.iters.len() {
662 match self.iters[i].next() {
663 Some(Ok(v)) => self.cursors[i] = Some(v),
664 Some(Err(e)) => {
665 self.cursors[i] = None;
666 self.pending_error = Some(e);
667 }
668 None => self.cursors[i] = None,
669 }
670 }
671
672 return Some(Ok(out));
673 }
674 }
675}
676
677struct FileBranchIter {
678 file: crate::file::JournalFile,
679 kind: BranchKind,
680}
681
682enum BranchKind {
683 Empty,
684 Indexed {
685 offset_iter: AndOffsetIter,
686 present_fields: Vec<String>,
687 },
688 Scan {
689 iter: FileEntryIter,
690 terms: Vec<MatchTerm>,
691 },
692}
693
694impl FileBranchIter {
695 fn new(
696 file: crate::file::JournalFile,
697 terms: Vec<MatchTerm>,
698 reverse: bool,
699 since_realtime: Option<u64>,
700 until_realtime: Option<u64>,
701 ) -> Result<Self> {
702 let mut present_fields = Vec::new();
703 let mut exact_terms = Vec::new();
704
705 for t in &terms {
706 match t {
707 MatchTerm::Exact { .. } => exact_terms.push(t),
708 MatchTerm::Present { field } => present_fields.push(field.clone()),
709 }
710 }
711
712 if exact_terms.is_empty() {
713 let iter = file.entry_iter_seek_realtime(reverse, since_realtime, until_realtime)?;
714 return Ok(Self {
715 file,
716 kind: BranchKind::Scan { iter, terms },
717 });
718 }
719
720 let mut data_refs: Vec<DataObjectRef> = Vec::new();
721 for t in &exact_terms {
722 let payload = match t {
723 MatchTerm::Exact { payload, .. } => payload.as_slice(),
724 _ => continue,
725 };
726
727 match file.find_data_object(payload) {
728 Ok(Some(d)) => data_refs.push(d),
729 Ok(None) => {
730 return Ok(Self {
731 file,
732 kind: BranchKind::Empty,
733 });
734 }
735 Err(_) => {
736 let iter =
737 file.entry_iter_seek_realtime(reverse, since_realtime, until_realtime)?;
738 return Ok(Self {
739 file,
740 kind: BranchKind::Scan { iter, terms },
741 });
742 }
743 }
744 }
745
746 data_refs.sort_by_key(|d| d.n_entries);
747
748 let mut iters = Vec::with_capacity(data_refs.len());
749 for d in data_refs {
750 match file.data_entry_offsets(d, reverse) {
751 Ok(it) => iters.push(it),
752 Err(_) => {
753 let iter =
754 file.entry_iter_seek_realtime(reverse, since_realtime, until_realtime)?;
755 return Ok(Self {
756 file,
757 kind: BranchKind::Scan { iter, terms },
758 });
759 }
760 }
761 }
762
763 Ok(Self {
764 file,
765 kind: BranchKind::Indexed {
766 offset_iter: AndOffsetIter::new(iters, reverse),
767 present_fields,
768 },
769 })
770 }
771}
772
773impl Iterator for FileBranchIter {
774 type Item = Result<EntryMeta>;
775
776 fn next(&mut self) -> Option<Self::Item> {
777 loop {
778 match &mut self.kind {
779 BranchKind::Empty => return None,
780 BranchKind::Indexed {
781 offset_iter,
782 present_fields,
783 } => {
784 let entry_offset = match offset_iter.next()? {
785 Ok(v) => v,
786 Err(e) => {
787 self.kind = BranchKind::Empty;
788 return Some(Err(e));
789 }
790 };
791
792 let meta = match self.file.read_entry_meta(entry_offset) {
793 Ok(m) => m,
794 Err(e) => return Some(Err(e)),
795 };
796
797 if present_fields.is_empty() {
798 return Some(Ok(meta));
799 }
800
801 let owned = match self.file.read_entry_owned(entry_offset) {
802 Ok(e) => e,
803 Err(e) => return Some(Err(e)),
804 };
805
806 if present_fields.iter().all(|f| owned.get(f).is_some()) {
807 return Some(Ok(meta));
808 }
809
810 continue;
811 }
812 BranchKind::Scan { iter, terms } => match iter.next()? {
813 Ok(meta) => {
814 if terms.is_empty() {
815 return Some(Ok(meta));
816 }
817
818 let owned = match self.file.read_entry_owned(meta.entry_offset) {
819 Ok(e) => e,
820 Err(e) => return Some(Err(e)),
821 };
822
823 if terms.iter().all(|t| term_matches(&owned, t)) {
824 return Some(Ok(meta));
825 }
826
827 continue;
828 }
829 Err(e) => return Some(Err(e)),
830 },
831 }
832 }
833 }
834}
835
836struct JournalIter {
837 query: JournalQuery,
838 cursor_key: Option<(EntryMeta, bool)>, produced: usize,
840 last_emitted: Option<EntryMeta>,
841 forward_heap: BinaryHeap<Reverse<HeapItem>>,
842 reverse_heap: BinaryHeap<HeapItem>,
843 iters: Vec<FileMetaIter>,
844 pending_errors: Vec<SdJournalError>,
845 done: bool,
846}
847
848#[derive(Clone, Copy)]
849struct HeapItem {
850 meta: EntryMeta,
851 file_idx: usize,
852}
853
854impl PartialEq for HeapItem {
855 fn eq(&self, other: &Self) -> bool {
856 self.meta == other.meta && self.file_idx == other.file_idx
857 }
858}
859
860impl Eq for HeapItem {}
861
862impl PartialOrd for HeapItem {
863 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
864 Some(self.cmp(other))
865 }
866}
867
868impl Ord for HeapItem {
869 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
870 self.meta
871 .cmp_key(&other.meta)
872 .then_with(|| self.file_idx.cmp(&other.file_idx))
873 }
874}
875
876impl JournalIter {
877 fn new(query: JournalQuery) -> Result<Self> {
878 if matches!(query.limit, Some(0)) {
879 return Ok(Self {
880 query,
881 cursor_key: None,
882 produced: 0,
883 last_emitted: None,
884 forward_heap: BinaryHeap::new(),
885 reverse_heap: BinaryHeap::new(),
886 iters: Vec::new(),
887 pending_errors: Vec::new(),
888 done: true,
889 });
890 }
891
892 let mut pending_errors = Vec::new();
893 let cursor_key = build_cursor_key(&query)?;
894 let branches = build_branches(&query);
895
896 let mut iters = Vec::with_capacity(query.journal.inner.files.len());
897 for f in &query.journal.inner.files {
898 let mut branch_iters = Vec::with_capacity(branches.len());
899 for terms in &branches {
900 match FileBranchIter::new(
901 f.clone(),
902 terms.clone(),
903 query.reverse,
904 query.since_realtime,
905 query.until_realtime,
906 ) {
907 Ok(it) => branch_iters.push(it),
908 Err(e) => pending_errors.push(e),
909 }
910 }
911 iters.push(FileMetaIter::from_branch_iters(branch_iters, query.reverse));
912 }
913
914 let mut forward_heap: BinaryHeap<Reverse<HeapItem>> = BinaryHeap::new();
915 let mut reverse_heap: BinaryHeap<HeapItem> = BinaryHeap::new();
916
917 for (idx, it) in iters.iter_mut().enumerate() {
918 if let Some(meta) = next_ok_meta(it, &mut pending_errors) {
919 let item = HeapItem {
920 meta,
921 file_idx: idx,
922 };
923 if query.reverse {
924 reverse_heap.push(item);
925 } else {
926 forward_heap.push(Reverse(item));
927 }
928 }
929 }
930
931 Ok(Self {
932 query,
933 cursor_key,
934 produced: 0,
935 last_emitted: None,
936 forward_heap,
937 reverse_heap,
938 iters,
939 pending_errors,
940 done: false,
941 })
942 }
943
944 fn pop_next(&mut self) -> Option<HeapItem> {
945 if self.query.reverse {
946 self.reverse_heap.pop()
947 } else {
948 self.forward_heap.pop().map(|r| r.0)
949 }
950 }
951
952 fn push_next(&mut self, item: HeapItem) {
953 if self.query.reverse {
954 self.reverse_heap.push(item);
955 } else {
956 self.forward_heap.push(Reverse(item));
957 }
958 }
959
960 fn passes_filters(&self, meta: &EntryMeta) -> bool {
961 if let Some((cursor_meta, inclusive)) = &self.cursor_key {
962 let ord = meta.cmp_key(cursor_meta);
963 if *inclusive {
964 if ord == std::cmp::Ordering::Less {
965 return false;
966 }
967 } else if ord != std::cmp::Ordering::Greater {
968 return false;
969 }
970 }
971
972 if let Some(since) = self.query.since_realtime
973 && meta.realtime_usec < since
974 {
975 return false;
976 }
977 if let Some(until) = self.query.until_realtime
978 && meta.realtime_usec > until
979 {
980 return false;
981 }
982
983 true
984 }
985}
986
987impl Iterator for JournalIter {
988 type Item = Result<EntryRef>;
989
990 fn next(&mut self) -> Option<Self::Item> {
991 if self.done {
992 return None;
993 }
994
995 if let Some(err) = self.pending_errors.pop() {
996 return Some(Err(err));
997 }
998
999 if let Some(limit) = self.query.limit
1000 && (limit == 0 || self.produced >= limit)
1001 {
1002 self.done = true;
1003 return None;
1004 }
1005
1006 loop {
1007 let item = match self.pop_next() {
1008 Some(item) => item,
1009 None => {
1010 self.done = true;
1011 return None;
1012 }
1013 };
1014
1015 if let Some(next_meta) =
1016 next_ok_meta(&mut self.iters[item.file_idx], &mut self.pending_errors)
1017 {
1018 self.push_next(HeapItem {
1019 meta: next_meta,
1020 file_idx: item.file_idx,
1021 });
1022 }
1023
1024 if !self.passes_filters(&item.meta) {
1025 continue;
1026 }
1027
1028 if self.last_emitted == Some(item.meta) {
1029 continue;
1030 }
1031
1032 let file = &self.query.journal.inner.files[item.file_idx];
1033 let entry = match file.read_entry_ref(item.meta.entry_offset) {
1034 Ok(e) => e,
1035 Err(e) => {
1036 self.pending_errors.push(e);
1037 if let Some(err) = self.pending_errors.pop() {
1038 return Some(Err(err));
1039 }
1040 continue;
1041 }
1042 };
1043
1044 self.last_emitted = Some(item.meta);
1045 self.produced = self.produced.saturating_add(1);
1046 return Some(Ok(entry));
1047 }
1048 }
1049}
1050
1051fn next_ok_meta<I>(it: &mut I, pending: &mut Vec<SdJournalError>) -> Option<EntryMeta>
1052where
1053 I: Iterator<Item = Result<EntryMeta>>,
1054{
1055 for item in it.by_ref() {
1056 match item {
1057 Ok(m) => return Some(m),
1058 Err(e) => pending.push(e),
1059 }
1060 }
1061 None
1062}
1063
1064fn build_cursor_key(query: &JournalQuery) -> Result<Option<(EntryMeta, bool)>> {
1065 let (cursor, inclusive) = match &query.cursor_start {
1066 Some(v) => v,
1067 None => return Ok(None),
1068 };
1069
1070 if let Some(k) = cursor.sdjournal_entry_key() {
1071 return Ok(Some((
1072 EntryMeta {
1073 file_id: k.file_id,
1074 entry_offset: k.entry_offset,
1075 seqnum: k.seqnum,
1076 realtime_usec: k.realtime_usec,
1077 },
1078 *inclusive,
1079 )));
1080 }
1081
1082 if let Some((file_id, entry_offset)) = cursor.file_offset() {
1083 let file = query
1084 .journal
1085 .inner
1086 .files
1087 .iter()
1088 .find(|f| f.file_id() == file_id)
1089 .ok_or(SdJournalError::NotFound)?;
1090
1091 let meta = file
1092 .read_entry_meta(entry_offset)
1093 .map_err(|_| SdJournalError::NotFound)?;
1094 return Ok(Some((meta, *inclusive)));
1095 }
1096
1097 if let Some(sys) = cursor.systemd() {
1098 let meta = resolve_systemd_cursor_key(query, sys)?;
1099 return Ok(Some((meta, *inclusive)));
1100 }
1101
1102 Err(SdJournalError::InvalidQuery {
1103 reason: "unsupported cursor format".to_string(),
1104 })
1105}
1106
1107fn resolve_systemd_cursor_key(
1108 query: &JournalQuery,
1109 sys: &crate::cursor::SystemdCursor,
1110) -> Result<EntryMeta> {
1111 match find_exact_systemd_cursor(query, sys) {
1112 Ok(Some(meta)) => return Ok(meta),
1113 Ok(None) => {}
1114 Err(e) => {
1115 if sys.realtime_usec.is_none() {
1116 return Err(e);
1117 }
1118 }
1119 }
1120
1121 let realtime_usec = sys.realtime_usec.ok_or(SdJournalError::NotFound)?;
1122 Ok(EntryMeta {
1123 file_id: [0u8; 16],
1124 entry_offset: 0,
1125 seqnum: sys.seqnum.unwrap_or(0),
1126 realtime_usec,
1127 })
1128}
1129
1130fn find_exact_systemd_cursor(
1131 query: &JournalQuery,
1132 sys: &crate::cursor::SystemdCursor,
1133) -> Result<Option<EntryMeta>> {
1134 let mut candidates: Vec<&crate::file::JournalFile> = Vec::new();
1135 if let Some(seqnum_id) = sys.seqnum_id {
1136 for f in &query.journal.inner.files {
1137 if f.seqnum_id() == seqnum_id {
1138 candidates.push(f);
1139 }
1140 }
1141 if candidates.is_empty() {
1142 candidates.extend(query.journal.inner.files.iter());
1143 }
1144 } else {
1145 candidates.extend(query.journal.inner.files.iter());
1146 }
1147
1148 let mut first_error: Option<SdJournalError> = None;
1149
1150 for file in candidates {
1151 match find_exact_systemd_cursor_in_file(file, sys) {
1152 Ok(Some(meta)) => return Ok(Some(meta)),
1153 Ok(None) => {}
1154 Err(e) => {
1155 if first_error.is_none() {
1156 first_error = Some(e);
1157 }
1158 }
1159 }
1160 }
1161
1162 match first_error {
1163 Some(e) => Err(e),
1164 None => Ok(None),
1165 }
1166}
1167
1168fn find_exact_systemd_cursor_in_file(
1169 file: &crate::file::JournalFile,
1170 sys: &crate::cursor::SystemdCursor,
1171) -> Result<Option<EntryMeta>> {
1172 if let Some(seqnum_id) = sys.seqnum_id
1173 && file.seqnum_id() != seqnum_id
1174 {
1175 return Ok(None);
1176 }
1177
1178 let iter = file.entry_iter_seek_realtime(false, sys.realtime_usec, None)?;
1179 for item in iter {
1180 let meta = item?;
1181
1182 if let Some(want_realtime) = sys.realtime_usec
1183 && meta.realtime_usec != want_realtime
1184 {
1185 continue;
1186 }
1187 if let Some(want_seqnum) = sys.seqnum
1188 && meta.seqnum != want_seqnum
1189 {
1190 continue;
1191 }
1192
1193 let fields = file.read_entry_cursor_fields(meta.entry_offset)?;
1194
1195 if let Some(want_realtime) = sys.realtime_usec
1196 && fields.realtime_usec != want_realtime
1197 {
1198 continue;
1199 }
1200 if let Some(want_seqnum) = sys.seqnum
1201 && fields.seqnum != want_seqnum
1202 {
1203 continue;
1204 }
1205 if let Some(want_boot_id) = sys.boot_id
1206 && fields.boot_id != want_boot_id
1207 {
1208 continue;
1209 }
1210 if let Some(want_monotonic) = sys.monotonic_usec
1211 && fields.monotonic_usec != want_monotonic
1212 {
1213 continue;
1214 }
1215 if let Some(want_xor) = sys.xor_hash
1216 && fields.xor_hash != want_xor
1217 {
1218 continue;
1219 }
1220
1221 return Ok(Some(meta));
1222 }
1223
1224 Ok(None)
1225}
1226
1227fn term_matches(entry: &EntryOwned, term: &MatchTerm) -> bool {
1228 match term {
1229 MatchTerm::Exact { field, value, .. } => entry
1230 .iter_fields()
1231 .any(|(k, v)| k == field.as_str() && v == value.as_slice()),
1232 MatchTerm::Present { field } => entry.get(field).is_some(),
1233 }
1234}
1235
1236#[cfg(test)]
1237mod tests {
1238 use super::*;
1239 use crate::JournalConfig;
1240 use crate::journal::JournalInner;
1241 use std::sync::Arc;
1242
1243 fn empty_journal_with_config(config: JournalConfig) -> Journal {
1244 Journal {
1245 inner: Arc::new(JournalInner {
1246 config,
1247 roots: Vec::new(),
1248 files: Vec::new(),
1249 }),
1250 }
1251 }
1252
1253 #[test]
1254 fn invalid_field_name_rejected_on_iter() {
1255 let journal = empty_journal_with_config(JournalConfig::default());
1256 let mut q = JournalQuery::new(journal);
1257 q.match_exact("BAD=FIELD", b"x");
1258 match q.iter() {
1259 Ok(_) => panic!("expected InvalidQuery"),
1260 Err(err) => assert!(matches!(err, SdJournalError::InvalidQuery { .. })),
1261 }
1262 }
1263
1264 #[test]
1265 fn too_many_terms_rejected_on_iter() {
1266 let cfg = JournalConfig {
1267 max_query_terms: 1,
1268 ..Default::default()
1269 };
1270 let journal = empty_journal_with_config(cfg);
1271 let mut q = JournalQuery::new(journal);
1272 q.match_present("A");
1273 q.match_present("B");
1274 match q.iter() {
1275 Ok(_) => panic!("expected QueryTerms limit error"),
1276 Err(err) => assert!(matches!(
1277 err,
1278 SdJournalError::LimitExceeded {
1279 kind: LimitKind::QueryTerms,
1280 ..
1281 }
1282 )),
1283 }
1284 }
1285}