Skip to main content

journal/
lib.rs

1//! Pure-Rust systemd journal reader and writer SDK.
2//!
3//! This crate provides a public Rust layer over the imported Netdata journal
4//! reader/writer crates. It intentionally keeps the low-level file parsing in
5//! the imported implementation and adds byte-safe entries, directory reading,
6//! export/JSON formatting, and a libsystemd-style facade.
7
8mod directory;
9mod explorer;
10mod export;
11mod facade;
12pub mod netdata;
13mod parse;
14mod reader_helpers;
15mod sealed_verify;
16mod verify_graph;
17
18pub use directory::DirectoryReader;
19pub use explorer::{
20    ExplorerAnchor, ExplorerComparison, ExplorerControl, ExplorerFieldMode, ExplorerFilter,
21    ExplorerFtsPattern, ExplorerHistogram, ExplorerHistogramBucket, ExplorerProgress,
22    ExplorerQuery, ExplorerResult, ExplorerRow, ExplorerSampling, ExplorerStats,
23    ExplorerStopReason, ExplorerStrategy,
24};
25pub use export::{export_entry, export_entry_bytes, format_entry_text, json_entry};
26pub use parse::{ParseError, ParsedCursor, parse_cursor, parse_match_bytes, parse_match_string};
27pub use sealed_verify::{verify_file, verify_file_with_key};
28
29use ouroboros::self_referencing;
30use std::collections::HashMap;
31use std::fmt;
32use std::num::NonZeroU64;
33use std::path::{Path, PathBuf};
34
35use directory::DirectoryEntryKey;
36#[cfg(test)]
37use directory::is_journal_file_name;
38use reader_helpers::*;
39#[cfg(test)]
40use sealed_verify::{
41    COMPACT_DATA_OBJECT_HEADER_SIZE, DATA_OBJECT_HEADER_SIZE, HEADER_MIN_SIZE,
42    INCOMPATIBLE_COMPACT, OBJECT_HEADER_SIZE, OBJECT_TYPE_DATA, OBJECT_TYPE_TAG, align8,
43};
44
45pub use facade::{
46    ERR_END_OF_ENTRIES, ERR_INVALID_CURSOR, ERR_NO_ENTRY, ERR_UNSUPPORTED, Error as FacadeError,
47    OutputMode, SdJournal, SdJournalAddConjunction, SdJournalAddDisjunction, SdJournalAddMatch,
48    SdJournalClose, SdJournalEnumerateAvailableData, SdJournalEnumerateAvailableUnique,
49    SdJournalEnumerateField, SdJournalEnumerateFields, SdJournalFlushMatches, SdJournalGetCursor,
50    SdJournalGetData, SdJournalGetEntry, SdJournalGetMonotonicUsec, SdJournalGetRealtimeUsec,
51    SdJournalGetSeqnum, SdJournalListBoots, SdJournalNext, SdJournalNextSkip, SdJournalOpen,
52    SdJournalOpenDirectory, SdJournalOpenDirectoryWithOptions, SdJournalOpenFile,
53    SdJournalOpenFileWithOptions, SdJournalOpenFiles, SdJournalOpenFilesWithOptions,
54    SdJournalPrevious, SdJournalPreviousSkip, SdJournalProcessOutput, SdJournalQueryUnique,
55    SdJournalQueryUniqueState, SdJournalRestartData, SdJournalRestartFields,
56    SdJournalRestartUnique, SdJournalSeekCursor, SdJournalSeekHead, SdJournalSeekRealtimeUsec,
57    SdJournalSeekTail, SdJournalSetOutputMode, SdJournalTestCursor, SdJournalVisitUniqueValues,
58};
59pub use journal_core::error::JournalError;
60use journal_core::file::ExperimentalMmapStrategy;
61pub use journal_core::file::{
62    BucketUtilization, Compression, Direction, EntryItemsType, FieldNamePolicy, HashableObject,
63    JournalFile, JournalReader, Location, Mmap, WindowManagerStats,
64};
65use journal_core::file::{CurrentRowMetadata, CurrentRowView};
66pub use journal_log_writer::{
67    Config, EntryTimestamps, Log, LogLifecycleEvent, LogLifecycleObserver, RetentionPolicy,
68    RotationPolicy, WriterError,
69};
70pub use journal_registry::{Origin, Source};
71
72pub type Result<T> = std::result::Result<T, SdkError>;
73
74#[derive(Debug)]
75pub enum SdkError {
76    Journal(JournalError),
77    InvalidPath(String),
78    InvalidCursor(String),
79    NoEntry,
80    DecompressionFailed(String),
81    Unsupported(&'static str),
82    VerificationError(String),
83}
84
85impl fmt::Display for SdkError {
86    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
87        match self {
88            Self::Journal(err) => write!(f, "{err}"),
89            Self::InvalidPath(path) => write!(f, "invalid path: {path}"),
90            Self::InvalidCursor(cursor) => write!(f, "invalid cursor: {cursor}"),
91            Self::NoEntry => write!(f, "no entry at current position"),
92            Self::DecompressionFailed(err) => write!(f, "decompression failed: {err}"),
93            Self::Unsupported(op) => write!(f, "unsupported operation: {op}"),
94            Self::VerificationError(msg) => {
95                write!(f, "journal verification failed: corrupt file: {msg}")
96            }
97        }
98    }
99}
100
101impl std::error::Error for SdkError {}
102
103impl From<JournalError> for SdkError {
104    fn from(err: JournalError) -> Self {
105        Self::Journal(err)
106    }
107}
108
109impl From<std::io::Error> for SdkError {
110    fn from(err: std::io::Error) -> Self {
111        Self::Journal(JournalError::Io(err))
112    }
113}
114
115#[derive(Debug, Clone)]
116pub struct Field {
117    pub name: String,
118    pub value: Vec<u8>,
119}
120
121impl Field {
122    pub fn new(name: &str, value: &str) -> Self {
123        Self {
124            name: name.to_string(),
125            value: value.as_bytes().to_vec(),
126        }
127    }
128
129    pub fn with_bytes(name: &str, value: Vec<u8>) -> Self {
130        Self {
131            name: name.to_string(),
132            value,
133        }
134    }
135
136    pub fn payload(&self) -> Vec<u8> {
137        let mut payload = Vec::with_capacity(self.name.len() + 1 + self.value.len());
138        payload.extend_from_slice(self.name.as_bytes());
139        payload.push(b'=');
140        payload.extend_from_slice(&self.value);
141        payload
142    }
143}
144
145#[derive(Debug, Clone, Copy, PartialEq, Eq)]
146pub enum ReaderBounds {
147    /// Systemd-style mutable reader bounds.
148    ///
149    /// The reader keeps a cached file size and refreshes it only when a read
150    /// would go beyond the cached end of file, matching libsystemd's active
151    /// journal behavior without a metadata syscall on every object read.
152    Live,
153    /// Immutable reader bounds.
154    ///
155    /// The reader fixes the file size at open time, like
156    /// `SD_JOURNAL_ASSUME_IMMUTABLE`, for polling/query consumers that do not
157    /// need to observe appends during the current scan.
158    Snapshot,
159}
160
161pub const DEFAULT_READER_WINDOW_SIZE: u64 = 32 * 1024 * 1024;
162
163impl Default for ReaderBounds {
164    fn default() -> Self {
165        Self::Live
166    }
167}
168
169#[derive(Debug, Clone, Copy, PartialEq, Eq)]
170pub struct ReaderOptions {
171    pub window_size: u64,
172    pub bounds: ReaderBounds,
173    mmap_strategy: ExperimentalMmapStrategy,
174}
175
176impl Default for ReaderOptions {
177    fn default() -> Self {
178        Self {
179            window_size: DEFAULT_READER_WINDOW_SIZE,
180            bounds: ReaderBounds::Live,
181            mmap_strategy: ExperimentalMmapStrategy::Windowed,
182        }
183    }
184}
185
186impl ReaderOptions {
187    pub fn live() -> Self {
188        Self::default()
189    }
190
191    pub fn snapshot() -> Self {
192        Self {
193            bounds: ReaderBounds::Snapshot,
194            ..Self::default()
195        }
196    }
197
198    pub fn with_window_size(mut self, window_size: u64) -> Self {
199        self.window_size = window_size;
200        self
201    }
202
203    pub fn with_bounds(mut self, bounds: ReaderBounds) -> Self {
204        self.bounds = bounds;
205        self
206    }
207
208    #[doc(hidden)]
209    pub fn with_experimental_mmap_strategy(mut self, strategy: ExperimentalMmapStrategy) -> Self {
210        self.mmap_strategy = strategy;
211        self
212    }
213}
214
215#[derive(Debug, Clone, Copy, PartialEq, Eq)]
216pub struct RawField<'a> {
217    pub name: &'a [u8],
218    pub value: &'a [u8],
219}
220
221impl RawField<'_> {
222    pub fn payload(&self) -> Vec<u8> {
223        let mut payload = Vec::with_capacity(self.name.len() + 1 + self.value.len());
224        payload.extend_from_slice(self.name);
225        payload.push(b'=');
226        payload.extend_from_slice(self.value);
227        payload
228    }
229
230    pub fn name_str(&self) -> Option<&str> {
231        std::str::from_utf8(self.name).ok()
232    }
233}
234
235#[derive(Debug, Clone)]
236pub struct Entry {
237    /// Convenience map for UTF-8 field names. RAW-mode files may contain field
238    /// names that are not valid UTF-8; use `raw_fields()` or `get_raw_values()`
239    /// when byte-identical field-name identity matters.
240    pub fields: HashMap<String, Vec<u8>>,
241    /// Convenience repeated-value map for UTF-8 field names.
242    pub field_values: HashMap<String, Vec<Vec<u8>>>,
243    /// Full on-disk DATA payloads as `FIELD=value` bytes.
244    pub payloads: Vec<Vec<u8>>,
245    pub seqnum: u64,
246    pub realtime: u64,
247    pub monotonic: u64,
248    pub boot_id: [u8; 16],
249    pub cursor: String,
250}
251
252impl Entry {
253    pub fn get(&self, key: &str) -> Option<&[u8]> {
254        self.fields.get(key).map(Vec::as_slice)
255    }
256
257    pub fn get_str(&self, key: &str) -> Option<&str> {
258        self.get(key)
259            .and_then(|value| std::str::from_utf8(value).ok())
260    }
261
262    pub fn raw_fields(&self) -> impl Iterator<Item = RawField<'_>> {
263        self.payloads
264            .iter()
265            .filter_map(|payload| split_raw_payload(payload))
266    }
267
268    pub fn get_raw(&self, key: &[u8]) -> Option<&[u8]> {
269        self.raw_fields()
270            .find(|field| field.name == key)
271            .map(|field| field.value)
272    }
273
274    pub fn get_raw_values(&self, key: &[u8]) -> Vec<&[u8]> {
275        self.raw_fields()
276            .filter_map(|field| (field.name == key).then_some(field.value))
277            .collect()
278    }
279}
280
281fn split_raw_payload(payload: &[u8]) -> Option<RawField<'_>> {
282    let eq = payload.iter().position(|byte| *byte == b'=')?;
283    Some(RawField {
284        name: &payload[..eq],
285        value: &payload[eq + 1..],
286    })
287}
288
289#[derive(Debug, Clone)]
290pub struct BootInfo {
291    pub index: i64,
292    pub boot_id: String,
293    pub first_entry: i64,
294    pub last_entry: i64,
295}
296
297#[derive(Debug, Clone, Copy)]
298pub struct FileHeader {
299    pub signature: [u8; 8],
300    pub compatible_flags: u32,
301    pub incompatible_flags: u32,
302    pub state: u8,
303    pub header_size: u64,
304    pub n_entries: u64,
305    pub head_entry_realtime: u64,
306    pub tail_entry_realtime: u64,
307    pub head_entry_seqnum: u64,
308    pub tail_entry_seqnum: u64,
309    pub tail_entry_boot_id: [u8; 16],
310    pub seqnum_id: [u8; 16],
311}
312
313#[derive(Debug, Clone, Copy)]
314pub(crate) struct FileHeaderSnapshot {
315    pub(crate) header: FileHeader,
316    pub(crate) machine_id: [u8; 16],
317    pub(crate) tail_entry_monotonic: u64,
318}
319
320impl FileHeaderSnapshot {
321    fn from_file(file: &JournalFile<Mmap>) -> Self {
322        let header = file.journal_header_ref();
323        Self {
324            header: FileHeader {
325                signature: header.signature,
326                compatible_flags: header.compatible_flags,
327                incompatible_flags: header.incompatible_flags,
328                state: header.state,
329                header_size: header.header_size,
330                n_entries: header.n_entries,
331                head_entry_realtime: header.head_entry_realtime,
332                tail_entry_realtime: header.tail_entry_realtime,
333                head_entry_seqnum: header.head_entry_seqnum,
334                tail_entry_seqnum: header.tail_entry_seqnum,
335                tail_entry_boot_id: header.tail_entry_boot_id,
336                seqnum_id: header.seqnum_id,
337            },
338            machine_id: header.machine_id,
339            tail_entry_monotonic: header.tail_entry_monotonic,
340        }
341    }
342}
343
344#[self_referencing]
345struct ReaderCell {
346    file: JournalFile<Mmap>,
347    #[borrows(file)]
348    #[not_covariant]
349    reader: JournalReader<'this, Mmap>,
350}
351
352pub struct FileReader {
353    inner: ReaderCell,
354    temp_path: Option<PathBuf>,
355    row: CurrentRowView,
356    header_snapshot: FileHeaderSnapshot,
357    bounds: ReaderBounds,
358}
359
360fn key_from_metadata(metadata: CurrentRowMetadata) -> DirectoryEntryKey {
361    DirectoryEntryKey {
362        seqnum_id: metadata.seqnum_id,
363        seqnum: metadata.seqnum,
364        boot_id: metadata.boot_id,
365        monotonic: metadata.monotonic,
366        realtime: metadata.realtime,
367        xor_hash: metadata.xor_hash,
368    }
369}
370
371enum StepStatus {
372    Valid,
373    Skip,
374    End,
375}
376
377impl Drop for FileReader {
378    fn drop(&mut self) {
379        self.inner
380            .with_file(|file| self.row.clear_current_best_effort(file));
381        if let Some(path) = &self.temp_path {
382            let _ = std::fs::remove_file(path);
383        }
384    }
385}
386
387impl FileReader {
388    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
389        Self::open_with_options(path, ReaderOptions::default())
390    }
391
392    pub fn open_with_options(path: impl AsRef<Path>, options: ReaderOptions) -> Result<Self> {
393        let path = path.as_ref();
394        if is_zst_file(path) {
395            return Self::open_zst(path, options);
396        }
397
398        let file = open_journal_file(path, options)?;
399        let header_snapshot = FileHeaderSnapshot::from_file(&file);
400        Ok(Self {
401            inner: ReaderCellBuilder {
402                file,
403                reader_builder: |_file| JournalReader::default(),
404            }
405            .build(),
406            temp_path: None,
407            row: CurrentRowView::default(),
408            header_snapshot,
409            bounds: options.bounds,
410        })
411    }
412
413    fn open_zst(path: &Path, options: ReaderOptions) -> Result<Self> {
414        let temp_path = decompress_zst_to_temp(path, "rust-sdk-journal")?;
415        let file = match open_journal_file(&temp_path, options) {
416            Ok(file) => file,
417            Err(err) => {
418                let _ = std::fs::remove_file(&temp_path);
419                return Err(err);
420            }
421        };
422        let header_snapshot = FileHeaderSnapshot::from_file(&file);
423        Ok(Self {
424            inner: ReaderCellBuilder {
425                file,
426                reader_builder: |_file| JournalReader::default(),
427            }
428            .build(),
429            temp_path: Some(temp_path),
430            row: CurrentRowView::default(),
431            header_snapshot,
432            bounds: options.bounds,
433        })
434    }
435
436    pub fn header(&self) -> FileHeader {
437        if self.bounds == ReaderBounds::Snapshot {
438            return self.header_snapshot.header;
439        }
440        self.live_header()
441    }
442
443    pub(crate) fn cached_header(&self) -> FileHeaderSnapshot {
444        self.header_snapshot
445    }
446
447    fn live_header(&self) -> FileHeader {
448        self.inner
449            .with_file(|file| FileHeaderSnapshot::from_file(file).header)
450    }
451
452    pub fn bucket_utilization(&self) -> Option<BucketUtilization> {
453        self.inner.with_file(JournalFile::bucket_utilization)
454    }
455
456    #[doc(hidden)]
457    pub fn mmap_stats(&self) -> Result<WindowManagerStats> {
458        self.inner
459            .with_file(|file| file.mmap_stats())
460            .map_err(Into::into)
461    }
462
463    pub fn seek_head(&mut self) {
464        self.inner
465            .with_file(|file| self.row.clear_current_best_effort(file));
466        self.inner.with_reader_mut(|reader| {
467            reader.set_location(Location::Head);
468        });
469    }
470
471    pub fn seek_tail(&mut self) {
472        self.inner
473            .with_file(|file| self.row.clear_current_best_effort(file));
474        self.inner.with_reader_mut(|reader| {
475            reader.set_location(Location::Tail);
476        });
477    }
478
479    pub fn seek_realtime(&mut self, usec: u64) {
480        self.inner
481            .with_file(|file| self.row.clear_current_best_effort(file));
482        self.inner.with_reader_mut(|reader| {
483            reader.set_location(Location::Realtime(usec));
484        });
485    }
486
487    pub fn seek_cursor(&mut self, cursor: &str) -> Result<()> {
488        let (seqnum_id, boot_id, realtime, seqnum) =
489            parse_cursor(cursor).map_err(|err| SdkError::InvalidCursor(err.to_string()))?;
490        self.seek_realtime(realtime);
491        while self.next()? {
492            let entry = self.get_entry()?;
493            if entry.realtime > realtime {
494                break;
495            }
496            if entry.realtime != realtime
497                || entry.seqnum != seqnum
498                || hex::encode(entry.boot_id) != boot_id
499            {
500                continue;
501            }
502            let current_cursor = self.get_cursor()?;
503            let (current_seqnum_id, _, _, _) = parse_cursor(&current_cursor)
504                .map_err(|err| SdkError::InvalidCursor(err.to_string()))?;
505            if current_seqnum_id == seqnum_id {
506                return Ok(());
507            }
508        }
509        Ok(())
510    }
511
512    pub fn next(&mut self) -> Result<bool> {
513        self.step_valid(Direction::Forward)
514    }
515
516    pub fn previous(&mut self) -> Result<bool> {
517        self.step_valid(Direction::Backward)
518    }
519
520    fn step_valid(&mut self, direction: Direction) -> Result<bool> {
521        self.inner
522            .with_file(|file| self.row.clear_current(file))
523            .map_err(SdkError::from)?;
524        loop {
525            let row = &mut self.row;
526            let status = self.inner.with_mut(|fields| {
527                if !fields.reader.step(fields.file, direction)? {
528                    return Ok(StepStatus::End);
529                }
530
531                match fields
532                    .reader
533                    .get_entry_offset()
534                    .and_then(|offset| row.load_entry(fields.file, offset))
535                {
536                    Ok(_) => Ok(StepStatus::Valid),
537                    Err(err) if recoverable_entry_error(&err) => Ok(StepStatus::Skip),
538                    Err(err) => Err(err),
539                }
540            })?;
541
542            match status {
543                StepStatus::Valid => {
544                    return Ok(true);
545                }
546                StepStatus::Skip => continue,
547                StepStatus::End => {
548                    self.inner
549                        .with_file(|file| self.row.clear_current(file))
550                        .map_err(SdkError::from)?;
551                    return Ok(false);
552                }
553            }
554        }
555    }
556
557    pub fn get_entry(&mut self) -> Result<Entry> {
558        self.invalidate_entry_data_state();
559        let inner = &mut self.inner;
560        let row = &mut self.row;
561        inner.with_mut(|fields| {
562            if row.entry_offset().is_none() {
563                let offset = fields.reader.get_entry_offset()?;
564                row.load_entry(fields.file, offset)?;
565            }
566            read_current_row_entry(fields.file, row)
567        })
568    }
569
570    pub fn visit_entry_payloads<F>(&mut self, mut visitor: F) -> Result<()>
571    where
572        F: FnMut(&[u8]) -> Result<()>,
573    {
574        self.invalidate_entry_data_state();
575        let inner = &mut self.inner;
576        let row = &mut self.row;
577        inner.with_mut(|fields| {
578            fields.reader.release_object_guards();
579            if row.entry_offset().is_none() {
580                let offset = fields.reader.get_entry_offset()?;
581                row.load_entry(fields.file, offset)?;
582            }
583            row.restart_data()?;
584            loop {
585                let payload = match row.read_next_payload(fields.file) {
586                    Ok(Some(payload)) => payload,
587                    Ok(None) => break,
588                    Err(err) if recoverable_entry_data_error(&err) => continue,
589                    Err(err) => {
590                        let _ = row.reset_data_state(fields.file);
591                        return Err(err.into());
592                    }
593                };
594                let payload = row.payload_slice(payload);
595                if let Err(err) = visitor(payload) {
596                    let _ = row.reset_data_state(fields.file);
597                    return Err(err);
598                }
599            }
600            row.reset_data_state(fields.file)?;
601            Ok(())
602        })
603    }
604
605    pub fn clear_entry_data_state(&mut self) {
606        self.inner
607            .with_file(|file| self.row.reset_data_state_best_effort(file));
608        self.inner
609            .with_reader_mut(|reader| reader.entry_data_restart());
610    }
611
612    fn invalidate_entry_data_state(&mut self) {
613        if self.row.data_state_active() {
614            self.clear_entry_data_state();
615        }
616    }
617
618    pub fn entry_data_restart(&mut self) -> Result<()> {
619        self.inner
620            .with_file(|file| self.row.clear_pins(file))
621            .map_err(SdkError::from)?;
622        self.inner
623            .with_reader_mut(|reader| reader.entry_data_restart());
624        if self.row.entry_offset().is_none() {
625            let row = &mut self.row;
626            self.inner.with_mut(|fields| {
627                let offset = fields.reader.get_entry_offset()?;
628                row.load_entry(fields.file, offset).map(|_| ())
629            })?;
630        }
631        self.row.restart_data().map_err(Into::into)
632    }
633
634    pub fn enumerate_entry_payload(&mut self) -> Result<Option<&[u8]>> {
635        let row = &mut self.row;
636        let payload = self.inner.with_mut(|fields| {
637            fields.reader.release_object_guards();
638            row.read_next_payload(fields.file)
639        })?;
640        Ok(payload.map(|payload| self.row.payload_slice(payload)))
641    }
642
643    pub fn collect_entry_payloads(&mut self, payloads: &mut Vec<Vec<u8>>) -> Result<()> {
644        payloads.clear();
645        self.visit_entry_payloads(|payload| {
646            payloads.push(payload.to_vec());
647            Ok(())
648        })
649    }
650
651    pub fn get_entry_payload(&mut self, field: &[u8]) -> Result<Option<Vec<u8>>> {
652        let mut found = None;
653        self.visit_entry_payloads(|payload| {
654            if found.is_none()
655                && payload.len() > field.len()
656                && payload.starts_with(field)
657                && payload[field.len()] == b'='
658            {
659                found = Some(payload.to_vec());
660            }
661            Ok(())
662        })?;
663        Ok(found)
664    }
665
666    pub fn get_realtime_usec(&self) -> Result<u64> {
667        if let Some(metadata) = self.row.metadata() {
668            return Ok(metadata.realtime);
669        }
670        self.inner
671            .with(|fields| fields.reader.get_realtime_usec(fields.file))
672            .map_err(Into::into)
673    }
674
675    pub fn get_seqnum(&self) -> Result<(u64, [u8; 16])> {
676        let key = self.current_directory_entry_key()?;
677        Ok((key.seqnum, key.seqnum_id))
678    }
679
680    pub fn get_monotonic_usec(&self) -> Result<(u64, [u8; 16])> {
681        let key = self.current_directory_entry_key()?;
682        Ok((key.monotonic, key.boot_id))
683    }
684
685    pub fn get_cursor(&self) -> Result<String> {
686        if let Some(metadata) = self.row.metadata() {
687            return Ok(format_cursor_from_key(key_from_metadata(metadata)));
688        }
689        let seqnum_id = self.header_snapshot.header.seqnum_id;
690        self.inner
691            .with(|fields| build_cursor(fields.file, fields.reader, seqnum_id))
692    }
693
694    fn current_directory_entry_key(&self) -> Result<DirectoryEntryKey> {
695        if let Some(metadata) = self.row.metadata() {
696            return Ok(key_from_metadata(metadata));
697        }
698        self.inner.with(|fields| {
699            let offset = fields.reader.get_entry_offset()?;
700            let entry = fields.file.entry_ref(offset)?;
701            Ok(DirectoryEntryKey {
702                seqnum_id: self.header_snapshot.header.seqnum_id,
703                seqnum: entry.header.seqnum,
704                boot_id: entry.header.boot_id,
705                monotonic: entry.header.monotonic,
706                realtime: entry.header.realtime,
707                xor_hash: entry.header.xor_hash,
708            })
709        })
710    }
711
712    pub fn test_cursor(&self, cursor: &str) -> Result<bool> {
713        Ok(self.get_cursor()? == cursor)
714    }
715
716    pub fn add_match(&mut self, data: &[u8]) {
717        self.inner.with_reader_mut(|reader| reader.add_match(data));
718    }
719
720    pub fn add_conjunction(&mut self) -> Result<()> {
721        self.inner
722            .with_mut(|fields| fields.reader.add_conjunction(fields.file))
723            .map_err(Into::into)
724    }
725
726    pub fn add_disjunction(&mut self) -> Result<()> {
727        self.inner
728            .with_mut(|fields| fields.reader.add_disjunction(fields.file))
729            .map_err(Into::into)
730    }
731
732    pub fn flush_matches(&mut self) {
733        self.inner.with_reader_mut(|reader| reader.flush_matches());
734    }
735}
736
737impl FileReader {
738    fn header_realtime_start(&self) -> u64 {
739        self.header_snapshot.header.head_entry_realtime
740    }
741
742    pub fn enumerate_fields(&mut self) -> Result<Vec<String>> {
743        self.invalidate_entry_data_state();
744        match self.enumerate_fields_indexed() {
745            Ok(fields) => Ok(fields),
746            Err(_) => enumerate_file_fields_by_scan(self),
747        }
748    }
749
750    pub(crate) fn enumerate_fields_indexed(&mut self) -> Result<Vec<String>> {
751        self.invalidate_entry_data_state();
752        self.inner.with_file(enumerate_file_fields_indexed)
753    }
754
755    pub fn query_unique(&mut self, field_name: &str) -> Result<Vec<Vec<u8>>> {
756        let mut out = Vec::new();
757        self.visit_unique_values(field_name, |value| {
758            out.push(value.to_vec());
759            Ok(())
760        })?;
761        Ok(out)
762    }
763
764    pub fn visit_unique_values<F>(&mut self, field_name: &str, visitor: F) -> Result<()>
765    where
766        F: FnMut(&[u8]) -> Result<()>,
767    {
768        self.invalidate_entry_data_state();
769        let decompressed = self.row.decompressed_mut();
770        self.inner.with_file(|file| {
771            visit_file_unique_values_indexed(file, field_name.as_bytes(), decompressed, visitor)
772        })
773    }
774}
775
776#[cfg(test)]
777mod tests;