Skip to main content

journal/
lib.rs

1//! Pure-Rust systemd journal reader and writer SDK.
2//!
3//! This crate provides a public Rust layer over the imported Netdata journal
4//! reader/writer crates. It intentionally keeps the low-level file parsing in
5//! the imported implementation and adds byte-safe entries, directory reading,
6//! export/JSON formatting, and a libsystemd-style facade.
7
8mod directory;
9mod explorer;
10mod export;
11mod facade;
12pub mod netdata;
13mod parse;
14mod reader_helpers;
15mod sealed_verify;
16mod verify_graph;
17
18pub use directory::DirectoryReader;
19pub use explorer::{
20    ExplorerAnchor, ExplorerComparison, ExplorerControl, ExplorerFieldMode, ExplorerFilter,
21    ExplorerFtsPattern, ExplorerHistogram, ExplorerHistogramBucket, ExplorerProgress,
22    ExplorerQuery, ExplorerResult, ExplorerRow, ExplorerSampling, ExplorerStats,
23    ExplorerStopReason, ExplorerStrategy,
24};
25pub use export::{export_entry, export_entry_bytes, format_entry_text, json_entry};
26pub use parse::{ParseError, ParsedCursor, parse_cursor, parse_match_bytes, parse_match_string};
27pub use sealed_verify::{verify_file, verify_file_with_key};
28
29use ouroboros::self_referencing;
30use std::collections::HashMap;
31use std::fmt;
32use std::num::NonZeroU64;
33use std::path::{Path, PathBuf};
34
35use directory::DirectoryEntryKey;
36#[cfg(test)]
37use directory::is_journal_file_name;
38use reader_helpers::*;
39#[cfg(test)]
40use sealed_verify::{
41    COMPACT_DATA_OBJECT_HEADER_SIZE, DATA_OBJECT_HEADER_SIZE, HEADER_MIN_SIZE,
42    INCOMPATIBLE_COMPACT, OBJECT_HEADER_SIZE, OBJECT_TYPE_DATA, OBJECT_TYPE_TAG, align8,
43};
44
45pub use facade::{
46    ERR_END_OF_ENTRIES, ERR_INVALID_CURSOR, ERR_NO_ENTRY, ERR_UNSUPPORTED, Error as FacadeError,
47    OutputMode, SdJournal, SdJournalAddConjunction, SdJournalAddDisjunction, SdJournalAddMatch,
48    SdJournalClose, SdJournalEnumerateAvailableData, SdJournalEnumerateAvailableUnique,
49    SdJournalEnumerateField, SdJournalEnumerateFields, SdJournalFlushMatches, SdJournalGetCursor,
50    SdJournalGetData, SdJournalGetEntry, SdJournalGetMonotonicUsec, SdJournalGetRealtimeUsec,
51    SdJournalGetSeqnum, SdJournalListBoots, SdJournalNext, SdJournalNextSkip, SdJournalOpen,
52    SdJournalOpenDirectory, SdJournalOpenDirectoryWithOptions, SdJournalOpenFile,
53    SdJournalOpenFileWithOptions, SdJournalOpenFiles, SdJournalOpenFilesWithOptions,
54    SdJournalPrevious, SdJournalPreviousSkip, SdJournalProcessOutput, SdJournalQueryUnique,
55    SdJournalQueryUniqueState, SdJournalRestartData, SdJournalRestartFields,
56    SdJournalRestartUnique, SdJournalSeekCursor, SdJournalSeekHead, SdJournalSeekRealtimeUsec,
57    SdJournalSeekTail, SdJournalSetOutputMode, SdJournalTestCursor, SdJournalVisitUniqueValues,
58};
59pub use journal_core::error::JournalError;
60pub use journal_core::file::{
61    BucketUtilization, Compression, Direction, EntryItemsType, ExperimentalMmapStrategy,
62    FieldNamePolicy, HashableObject, JournalFile, JournalReader, Location, Mmap,
63    WindowManagerStats,
64};
65use journal_core::file::{CurrentRowMetadata, CurrentRowView};
66pub use journal_log_writer::{
67    Config, EntryTimestamps, Log, LogLifecycleEvent, LogLifecycleObserver, RetentionPolicy,
68    RotationPolicy, WriterError,
69};
70pub use journal_registry::{Origin, Source};
71
72pub type Result<T> = std::result::Result<T, SdkError>;
73
74#[derive(Debug)]
75pub enum SdkError {
76    Journal(JournalError),
77    InvalidPath(String),
78    InvalidCursor(String),
79    NoEntry,
80    DecompressionFailed(String),
81    Unsupported(&'static str),
82    VerificationError(String),
83}
84
85impl fmt::Display for SdkError {
86    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
87        match self {
88            Self::Journal(err) => write!(f, "{err}"),
89            Self::InvalidPath(path) => write!(f, "invalid path: {path}"),
90            Self::InvalidCursor(cursor) => write!(f, "invalid cursor: {cursor}"),
91            Self::NoEntry => write!(f, "no entry at current position"),
92            Self::DecompressionFailed(err) => write!(f, "decompression failed: {err}"),
93            Self::Unsupported(op) => write!(f, "unsupported operation: {op}"),
94            Self::VerificationError(msg) => {
95                write!(f, "journal verification failed: corrupt file: {msg}")
96            }
97        }
98    }
99}
100
101impl std::error::Error for SdkError {}
102
103impl From<JournalError> for SdkError {
104    fn from(err: JournalError) -> Self {
105        Self::Journal(err)
106    }
107}
108
109impl From<std::io::Error> for SdkError {
110    fn from(err: std::io::Error) -> Self {
111        Self::Journal(JournalError::Io(err))
112    }
113}
114
115#[derive(Debug, Clone)]
116pub struct Field {
117    pub name: String,
118    pub value: Vec<u8>,
119}
120
121impl Field {
122    pub fn new(name: &str, value: &str) -> Self {
123        Self {
124            name: name.to_string(),
125            value: value.as_bytes().to_vec(),
126        }
127    }
128
129    pub fn with_bytes(name: &str, value: Vec<u8>) -> Self {
130        Self {
131            name: name.to_string(),
132            value,
133        }
134    }
135
136    pub fn payload(&self) -> Vec<u8> {
137        let mut payload = Vec::with_capacity(self.name.len() + 1 + self.value.len());
138        payload.extend_from_slice(self.name.as_bytes());
139        payload.push(b'=');
140        payload.extend_from_slice(&self.value);
141        payload
142    }
143}
144
145#[derive(Debug, Clone, Copy, PartialEq, Eq)]
146pub enum ReaderBounds {
147    /// Systemd-style mutable reader bounds.
148    ///
149    /// The reader keeps a cached file size and refreshes it only when a read
150    /// would go beyond the cached end of file, matching libsystemd's active
151    /// journal behavior without a metadata syscall on every object read.
152    Live,
153    /// Immutable reader bounds.
154    ///
155    /// The reader fixes the file size at open time, like
156    /// `SD_JOURNAL_ASSUME_IMMUTABLE`, for polling/query consumers that do not
157    /// need to observe appends during the current scan.
158    Snapshot,
159}
160
161pub const DEFAULT_READER_WINDOW_SIZE: u64 = 32 * 1024 * 1024;
162
163impl Default for ReaderBounds {
164    fn default() -> Self {
165        Self::Live
166    }
167}
168
169#[derive(Debug, Clone, Copy, PartialEq, Eq)]
170pub struct ReaderOptions {
171    pub window_size: u64,
172    pub bounds: ReaderBounds,
173    pub mmap_strategy: ExperimentalMmapStrategy,
174}
175
176impl Default for ReaderOptions {
177    fn default() -> Self {
178        Self {
179            window_size: DEFAULT_READER_WINDOW_SIZE,
180            bounds: ReaderBounds::Live,
181            mmap_strategy: ExperimentalMmapStrategy::Windowed,
182        }
183    }
184}
185
186impl ReaderOptions {
187    pub fn live() -> Self {
188        Self::default()
189    }
190
191    pub fn snapshot() -> Self {
192        Self {
193            bounds: ReaderBounds::Snapshot,
194            ..Self::default()
195        }
196    }
197
198    pub fn with_window_size(mut self, window_size: u64) -> Self {
199        self.window_size = window_size;
200        self
201    }
202
203    pub fn with_mmap_strategy(mut self, strategy: ExperimentalMmapStrategy) -> Self {
204        self.mmap_strategy = strategy;
205        self
206    }
207}
208
209#[derive(Debug, Clone, Copy, PartialEq, Eq)]
210pub struct RawField<'a> {
211    pub name: &'a [u8],
212    pub value: &'a [u8],
213}
214
215impl RawField<'_> {
216    pub fn payload(&self) -> Vec<u8> {
217        let mut payload = Vec::with_capacity(self.name.len() + 1 + self.value.len());
218        payload.extend_from_slice(self.name);
219        payload.push(b'=');
220        payload.extend_from_slice(self.value);
221        payload
222    }
223
224    pub fn name_str(&self) -> Option<&str> {
225        std::str::from_utf8(self.name).ok()
226    }
227}
228
229#[derive(Debug, Clone)]
230pub struct Entry {
231    /// Convenience map for UTF-8 field names. RAW-mode files may contain field
232    /// names that are not valid UTF-8; use `raw_fields()` or `get_raw_values()`
233    /// when byte-identical field-name identity matters.
234    pub fields: HashMap<String, Vec<u8>>,
235    /// Convenience repeated-value map for UTF-8 field names.
236    pub field_values: HashMap<String, Vec<Vec<u8>>>,
237    /// Full on-disk DATA payloads as `FIELD=value` bytes.
238    pub payloads: Vec<Vec<u8>>,
239    pub seqnum: u64,
240    pub realtime: u64,
241    pub monotonic: u64,
242    pub boot_id: [u8; 16],
243    pub cursor: String,
244}
245
246impl Entry {
247    pub fn get(&self, key: &str) -> Option<&[u8]> {
248        self.fields.get(key).map(Vec::as_slice)
249    }
250
251    pub fn get_str(&self, key: &str) -> Option<&str> {
252        self.get(key)
253            .and_then(|value| std::str::from_utf8(value).ok())
254    }
255
256    pub fn raw_fields(&self) -> impl Iterator<Item = RawField<'_>> {
257        self.payloads
258            .iter()
259            .filter_map(|payload| split_raw_payload(payload))
260    }
261
262    pub fn get_raw(&self, key: &[u8]) -> Option<&[u8]> {
263        self.raw_fields()
264            .find(|field| field.name == key)
265            .map(|field| field.value)
266    }
267
268    pub fn get_raw_values(&self, key: &[u8]) -> Vec<&[u8]> {
269        self.raw_fields()
270            .filter_map(|field| (field.name == key).then_some(field.value))
271            .collect()
272    }
273}
274
275fn split_raw_payload(payload: &[u8]) -> Option<RawField<'_>> {
276    let eq = payload.iter().position(|byte| *byte == b'=')?;
277    Some(RawField {
278        name: &payload[..eq],
279        value: &payload[eq + 1..],
280    })
281}
282
283#[derive(Debug, Clone)]
284pub struct BootInfo {
285    pub index: i64,
286    pub boot_id: String,
287    pub first_entry: i64,
288    pub last_entry: i64,
289}
290
291#[derive(Debug, Clone, Copy)]
292pub struct FileHeader {
293    pub signature: [u8; 8],
294    pub compatible_flags: u32,
295    pub incompatible_flags: u32,
296    pub state: u8,
297    pub header_size: u64,
298    pub n_entries: u64,
299    pub head_entry_realtime: u64,
300    pub tail_entry_realtime: u64,
301    pub head_entry_seqnum: u64,
302    pub tail_entry_seqnum: u64,
303    pub tail_entry_boot_id: [u8; 16],
304    pub seqnum_id: [u8; 16],
305}
306
307#[derive(Debug, Clone, Copy)]
308pub(crate) struct FileHeaderSnapshot {
309    pub(crate) header: FileHeader,
310    pub(crate) machine_id: [u8; 16],
311    pub(crate) tail_entry_monotonic: u64,
312}
313
314impl FileHeaderSnapshot {
315    fn from_file(file: &JournalFile<Mmap>) -> Self {
316        let header = file.journal_header_ref();
317        Self {
318            header: FileHeader {
319                signature: header.signature,
320                compatible_flags: header.compatible_flags,
321                incompatible_flags: header.incompatible_flags,
322                state: header.state,
323                header_size: header.header_size,
324                n_entries: header.n_entries,
325                head_entry_realtime: header.head_entry_realtime,
326                tail_entry_realtime: header.tail_entry_realtime,
327                head_entry_seqnum: header.head_entry_seqnum,
328                tail_entry_seqnum: header.tail_entry_seqnum,
329                tail_entry_boot_id: header.tail_entry_boot_id,
330                seqnum_id: header.seqnum_id,
331            },
332            machine_id: header.machine_id,
333            tail_entry_monotonic: header.tail_entry_monotonic,
334        }
335    }
336}
337
338#[self_referencing]
339struct ReaderCell {
340    file: JournalFile<Mmap>,
341    #[borrows(file)]
342    #[not_covariant]
343    reader: JournalReader<'this, Mmap>,
344}
345
346pub struct FileReader {
347    inner: ReaderCell,
348    temp_path: Option<PathBuf>,
349    row: CurrentRowView,
350    header_snapshot: FileHeaderSnapshot,
351    bounds: ReaderBounds,
352}
353
354fn key_from_metadata(metadata: CurrentRowMetadata) -> DirectoryEntryKey {
355    DirectoryEntryKey {
356        seqnum_id: metadata.seqnum_id,
357        seqnum: metadata.seqnum,
358        boot_id: metadata.boot_id,
359        monotonic: metadata.monotonic,
360        realtime: metadata.realtime,
361        xor_hash: metadata.xor_hash,
362    }
363}
364
365enum StepStatus {
366    Valid,
367    Skip,
368    End,
369}
370
371impl Drop for FileReader {
372    fn drop(&mut self) {
373        self.inner
374            .with_file(|file| self.row.clear_current_best_effort(file));
375        if let Some(path) = &self.temp_path {
376            let _ = std::fs::remove_file(path);
377        }
378    }
379}
380
381impl FileReader {
382    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
383        Self::open_with_options(path, ReaderOptions::default())
384    }
385
386    pub fn open_with_options(path: impl AsRef<Path>, options: ReaderOptions) -> Result<Self> {
387        let path = path.as_ref();
388        if is_zst_file(path) {
389            return Self::open_zst(path, options);
390        }
391
392        let file = open_journal_file(path, options)?;
393        let header_snapshot = FileHeaderSnapshot::from_file(&file);
394        Ok(Self {
395            inner: ReaderCellBuilder {
396                file,
397                reader_builder: |_file| JournalReader::default(),
398            }
399            .build(),
400            temp_path: None,
401            row: CurrentRowView::default(),
402            header_snapshot,
403            bounds: options.bounds,
404        })
405    }
406
407    fn open_zst(path: &Path, options: ReaderOptions) -> Result<Self> {
408        let temp_path = decompress_zst_to_temp(path, "rust-sdk-journal")?;
409        let file = match open_journal_file(&temp_path, options) {
410            Ok(file) => file,
411            Err(err) => {
412                let _ = std::fs::remove_file(&temp_path);
413                return Err(err);
414            }
415        };
416        let header_snapshot = FileHeaderSnapshot::from_file(&file);
417        Ok(Self {
418            inner: ReaderCellBuilder {
419                file,
420                reader_builder: |_file| JournalReader::default(),
421            }
422            .build(),
423            temp_path: Some(temp_path),
424            row: CurrentRowView::default(),
425            header_snapshot,
426            bounds: options.bounds,
427        })
428    }
429
430    pub fn header(&self) -> FileHeader {
431        if self.bounds == ReaderBounds::Snapshot {
432            return self.header_snapshot.header;
433        }
434        self.live_header()
435    }
436
437    pub(crate) fn cached_header(&self) -> FileHeaderSnapshot {
438        self.header_snapshot
439    }
440
441    fn live_header(&self) -> FileHeader {
442        self.inner
443            .with_file(|file| FileHeaderSnapshot::from_file(file).header)
444    }
445
446    pub fn bucket_utilization(&self) -> Option<BucketUtilization> {
447        self.inner.with_file(JournalFile::bucket_utilization)
448    }
449
450    #[doc(hidden)]
451    pub fn mmap_stats(&self) -> Result<WindowManagerStats> {
452        self.inner
453            .with_file(|file| file.mmap_stats())
454            .map_err(Into::into)
455    }
456
457    pub fn seek_head(&mut self) {
458        self.inner
459            .with_file(|file| self.row.clear_current_best_effort(file));
460        self.inner.with_reader_mut(|reader| {
461            reader.set_location(Location::Head);
462        });
463    }
464
465    pub fn seek_tail(&mut self) {
466        self.inner
467            .with_file(|file| self.row.clear_current_best_effort(file));
468        self.inner.with_reader_mut(|reader| {
469            reader.set_location(Location::Tail);
470        });
471    }
472
473    pub fn seek_realtime(&mut self, usec: u64) {
474        self.inner
475            .with_file(|file| self.row.clear_current_best_effort(file));
476        self.inner.with_reader_mut(|reader| {
477            reader.set_location(Location::Realtime(usec));
478        });
479    }
480
481    pub fn seek_cursor(&mut self, cursor: &str) -> Result<()> {
482        let (seqnum_id, boot_id, realtime, seqnum) =
483            parse_cursor(cursor).map_err(|err| SdkError::InvalidCursor(err.to_string()))?;
484        self.seek_realtime(realtime);
485        while self.next()? {
486            let entry = self.get_entry()?;
487            if entry.realtime > realtime {
488                break;
489            }
490            if entry.realtime != realtime
491                || entry.seqnum != seqnum
492                || hex::encode(entry.boot_id) != boot_id
493            {
494                continue;
495            }
496            let current_cursor = self.get_cursor()?;
497            let (current_seqnum_id, _, _, _) = parse_cursor(&current_cursor)
498                .map_err(|err| SdkError::InvalidCursor(err.to_string()))?;
499            if current_seqnum_id == seqnum_id {
500                return Ok(());
501            }
502        }
503        Ok(())
504    }
505
506    pub fn next(&mut self) -> Result<bool> {
507        self.step_valid(Direction::Forward)
508    }
509
510    pub fn previous(&mut self) -> Result<bool> {
511        self.step_valid(Direction::Backward)
512    }
513
514    fn step_valid(&mut self, direction: Direction) -> Result<bool> {
515        self.inner
516            .with_file(|file| self.row.clear_current(file))
517            .map_err(SdkError::from)?;
518        loop {
519            let row = &mut self.row;
520            let status = self.inner.with_mut(|fields| {
521                if !fields.reader.step(fields.file, direction)? {
522                    return Ok(StepStatus::End);
523                }
524
525                match fields
526                    .reader
527                    .get_entry_offset()
528                    .and_then(|offset| row.load_entry(fields.file, offset))
529                {
530                    Ok(_) => Ok(StepStatus::Valid),
531                    Err(err) if recoverable_entry_error(&err) => Ok(StepStatus::Skip),
532                    Err(err) => Err(err),
533                }
534            })?;
535
536            match status {
537                StepStatus::Valid => {
538                    return Ok(true);
539                }
540                StepStatus::Skip => continue,
541                StepStatus::End => {
542                    self.inner
543                        .with_file(|file| self.row.clear_current(file))
544                        .map_err(SdkError::from)?;
545                    return Ok(false);
546                }
547            }
548        }
549    }
550
551    pub fn get_entry(&mut self) -> Result<Entry> {
552        self.invalidate_entry_data_state();
553        let inner = &mut self.inner;
554        let row = &mut self.row;
555        inner.with_mut(|fields| {
556            if row.entry_offset().is_none() {
557                let offset = fields.reader.get_entry_offset()?;
558                row.load_entry(fields.file, offset)?;
559            }
560            read_current_row_entry(fields.file, row)
561        })
562    }
563
564    pub fn visit_entry_payloads<F>(&mut self, mut visitor: F) -> Result<()>
565    where
566        F: FnMut(&[u8]) -> Result<()>,
567    {
568        self.invalidate_entry_data_state();
569        let inner = &mut self.inner;
570        let row = &mut self.row;
571        inner.with_mut(|fields| {
572            fields.reader.release_object_guards();
573            if row.entry_offset().is_none() {
574                let offset = fields.reader.get_entry_offset()?;
575                row.load_entry(fields.file, offset)?;
576            }
577            row.restart_data()?;
578            loop {
579                let payload = match row.read_next_payload(fields.file) {
580                    Ok(Some(payload)) => payload,
581                    Ok(None) => break,
582                    Err(err) if recoverable_entry_data_error(&err) => continue,
583                    Err(err) => {
584                        let _ = row.reset_data_state(fields.file);
585                        return Err(err.into());
586                    }
587                };
588                let payload = row.payload_slice(payload);
589                if let Err(err) = visitor(payload) {
590                    let _ = row.reset_data_state(fields.file);
591                    return Err(err);
592                }
593            }
594            row.reset_data_state(fields.file)?;
595            Ok(())
596        })
597    }
598
599    pub fn clear_entry_data_state(&mut self) {
600        self.inner
601            .with_file(|file| self.row.reset_data_state_best_effort(file));
602        self.inner
603            .with_reader_mut(|reader| reader.entry_data_restart());
604    }
605
606    fn invalidate_entry_data_state(&mut self) {
607        if self.row.data_state_active() {
608            self.clear_entry_data_state();
609        }
610    }
611
612    pub fn entry_data_restart(&mut self) -> Result<()> {
613        self.inner
614            .with_file(|file| self.row.clear_pins(file))
615            .map_err(SdkError::from)?;
616        self.inner
617            .with_reader_mut(|reader| reader.entry_data_restart());
618        if self.row.entry_offset().is_none() {
619            let row = &mut self.row;
620            self.inner.with_mut(|fields| {
621                let offset = fields.reader.get_entry_offset()?;
622                row.load_entry(fields.file, offset).map(|_| ())
623            })?;
624        }
625        self.row.restart_data().map_err(Into::into)
626    }
627
628    pub fn enumerate_entry_payload(&mut self) -> Result<Option<&[u8]>> {
629        let row = &mut self.row;
630        let payload = self.inner.with_mut(|fields| {
631            fields.reader.release_object_guards();
632            row.read_next_payload(fields.file)
633        })?;
634        Ok(payload.map(|payload| self.row.payload_slice(payload)))
635    }
636
637    pub fn collect_entry_payloads(&mut self, payloads: &mut Vec<Vec<u8>>) -> Result<()> {
638        payloads.clear();
639        self.visit_entry_payloads(|payload| {
640            payloads.push(payload.to_vec());
641            Ok(())
642        })
643    }
644
645    pub fn get_entry_payload(&mut self, field: &[u8]) -> Result<Option<Vec<u8>>> {
646        let mut found = None;
647        self.visit_entry_payloads(|payload| {
648            if found.is_none()
649                && payload.len() > field.len()
650                && payload.starts_with(field)
651                && payload[field.len()] == b'='
652            {
653                found = Some(payload.to_vec());
654            }
655            Ok(())
656        })?;
657        Ok(found)
658    }
659
660    pub fn get_realtime_usec(&self) -> Result<u64> {
661        if let Some(metadata) = self.row.metadata() {
662            return Ok(metadata.realtime);
663        }
664        self.inner
665            .with(|fields| fields.reader.get_realtime_usec(fields.file))
666            .map_err(Into::into)
667    }
668
669    pub fn get_seqnum(&self) -> Result<(u64, [u8; 16])> {
670        let key = self.current_directory_entry_key()?;
671        Ok((key.seqnum, key.seqnum_id))
672    }
673
674    pub fn get_monotonic_usec(&self) -> Result<(u64, [u8; 16])> {
675        let key = self.current_directory_entry_key()?;
676        Ok((key.monotonic, key.boot_id))
677    }
678
679    pub fn get_cursor(&self) -> Result<String> {
680        if let Some(metadata) = self.row.metadata() {
681            return Ok(format_cursor_from_key(key_from_metadata(metadata)));
682        }
683        let seqnum_id = self.header_snapshot.header.seqnum_id;
684        self.inner
685            .with(|fields| build_cursor(fields.file, fields.reader, seqnum_id))
686    }
687
688    fn current_directory_entry_key(&self) -> Result<DirectoryEntryKey> {
689        if let Some(metadata) = self.row.metadata() {
690            return Ok(key_from_metadata(metadata));
691        }
692        self.inner.with(|fields| {
693            let offset = fields.reader.get_entry_offset()?;
694            let entry = fields.file.entry_ref(offset)?;
695            Ok(DirectoryEntryKey {
696                seqnum_id: self.header_snapshot.header.seqnum_id,
697                seqnum: entry.header.seqnum,
698                boot_id: entry.header.boot_id,
699                monotonic: entry.header.monotonic,
700                realtime: entry.header.realtime,
701                xor_hash: entry.header.xor_hash,
702            })
703        })
704    }
705
706    pub fn test_cursor(&self, cursor: &str) -> Result<bool> {
707        Ok(self.get_cursor()? == cursor)
708    }
709
710    pub fn add_match(&mut self, data: &[u8]) {
711        self.inner.with_reader_mut(|reader| reader.add_match(data));
712    }
713
714    pub fn add_conjunction(&mut self) -> Result<()> {
715        self.inner
716            .with_mut(|fields| fields.reader.add_conjunction(fields.file))
717            .map_err(Into::into)
718    }
719
720    pub fn add_disjunction(&mut self) -> Result<()> {
721        self.inner
722            .with_mut(|fields| fields.reader.add_disjunction(fields.file))
723            .map_err(Into::into)
724    }
725
726    pub fn flush_matches(&mut self) {
727        self.inner.with_reader_mut(|reader| reader.flush_matches());
728    }
729}
730
731impl FileReader {
732    fn header_realtime_start(&self) -> u64 {
733        self.header_snapshot.header.head_entry_realtime
734    }
735
736    pub fn enumerate_fields(&mut self) -> Result<Vec<String>> {
737        self.invalidate_entry_data_state();
738        match self.enumerate_fields_indexed() {
739            Ok(fields) => Ok(fields),
740            Err(_) => enumerate_file_fields_by_scan(self),
741        }
742    }
743
744    pub(crate) fn enumerate_fields_indexed(&mut self) -> Result<Vec<String>> {
745        self.invalidate_entry_data_state();
746        self.inner.with_file(enumerate_file_fields_indexed)
747    }
748
749    pub fn query_unique(&mut self, field_name: &str) -> Result<Vec<Vec<u8>>> {
750        let mut out = Vec::new();
751        self.visit_unique_values(field_name, |value| {
752            out.push(value.to_vec());
753            Ok(())
754        })?;
755        Ok(out)
756    }
757
758    pub fn visit_unique_values<F>(&mut self, field_name: &str, visitor: F) -> Result<()>
759    where
760        F: FnMut(&[u8]) -> Result<()>,
761    {
762        self.invalidate_entry_data_state();
763        let decompressed = self.row.decompressed_mut();
764        self.inner.with_file(|file| {
765            visit_file_unique_values_indexed(file, field_name.as_bytes(), decompressed, visitor)
766        })
767    }
768}
769
770#[cfg(test)]
771mod tests;