Skip to main content

journal_core/file/
reader.rs

1use super::mmap::MemoryMap;
2use crate::error::Result;
3use crate::file::{
4    EntryItemsType,
5    cursor::{JournalCursor, Location},
6    file::{EntryDataIterator, FieldDataIterator, FieldIterator, JournalFile},
7    filter::{FilterExpr, JournalFilter, LogicalOp},
8    object::{DataObject, FieldObject},
9    offset_array::Direction,
10    value_guard::ValueGuard,
11};
12use std::num::NonZeroU64;
13
14pub struct JournalReader<'a, M: MemoryMap> {
15    cursor: JournalCursor,
16
17    filter: Option<JournalFilter>,
18    field_iterator: Option<FieldIterator<'a, M>>,
19    field_data_iterator: Option<FieldDataIterator<'a, M>>,
20    entry_data_iterator: Option<EntryDataIterator<'a, M>>,
21
22    field_guard: Option<ValueGuard<'a, FieldObject<&'a [u8]>>>,
23    data_guard: Option<ValueGuard<'a, DataObject<&'a [u8]>>>,
24    raw_payload_guard: Option<ValueGuard<'a, &'a [u8]>>,
25}
26
27#[cfg(test)]
28mod tests {
29    use super::*;
30    use crate::file::{JournalFileOptions, JournalWriter, MmapMut};
31    use tempfile::TempDir;
32
33    fn test_uuid(seed: u8) -> uuid::Uuid {
34        uuid::Uuid::from_bytes([seed; 16])
35    }
36
37    fn create_test_journal() -> (TempDir, JournalFile<MmapMut>) {
38        let dir = TempDir::new().expect("create temp dir");
39        let journal_dir = dir.path().join("journals");
40        std::fs::create_dir_all(&journal_dir).expect("create journal dir");
41        let path = journal_dir.join("system.journal");
42        let repo_file =
43            crate::repository::File::from_path(&path).expect("test journal path should parse");
44
45        let mut journal_file = JournalFile::create(
46            &repo_file,
47            JournalFileOptions::new(test_uuid(1), test_uuid(2), test_uuid(3)),
48        )
49        .expect("create journal");
50        let mut writer =
51            JournalWriter::new(&mut journal_file, 1, test_uuid(4)).expect("create writer");
52        let payloads = [b"MESSAGE=test".as_slice(), b"PRIORITY=6".as_slice()];
53        writer
54            .add_entry(&mut journal_file, &payloads, 1_000_000, 100)
55            .expect("write entry");
56
57        (dir, journal_file)
58    }
59
60    #[test]
61    fn build_filter_returns_expr_and_consumes_pending_filter() {
62        let (_dir, journal_file) = create_test_journal();
63        let mut reader = JournalReader::<MmapMut>::default();
64        reader.add_match(b"MESSAGE=test");
65
66        let expr = reader
67            .build_filter(&journal_file)
68            .expect("build filter")
69            .expect("resolved filter expr");
70
71        assert!(!matches!(expr, FilterExpr::None));
72        assert!(reader.filter.is_none(), "pending filter should be consumed");
73        assert!(
74            reader
75                .build_filter(&journal_file)
76                .expect("second build")
77                .is_none()
78        );
79    }
80
81    #[test]
82    fn build_filter_failure_keeps_pending_filter() {
83        let (_dir, journal_file) = create_test_journal();
84        let mut reader = JournalReader::<MmapMut>::default();
85        reader.filter = Some(JournalFilter::default());
86
87        assert!(reader.build_filter(&journal_file).is_err());
88        assert!(
89            reader.filter.is_some(),
90            "pending filter should remain after build failure"
91        );
92        assert!(reader.build_filter(&journal_file).is_err());
93    }
94}
95
96impl<M: MemoryMap> std::fmt::Debug for JournalReader<'_, M> {
97    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98        f.debug_struct("JournalReader")
99            // .field("cursor", &self.cursor)
100            .field("field_guard", &self.field_guard)
101            .field("data_guard", &self.data_guard)
102            .finish()
103    }
104}
105
106impl<M: MemoryMap> Default for JournalReader<'_, M> {
107    fn default() -> Self {
108        Self {
109            cursor: JournalCursor::new(),
110            filter: None,
111            field_iterator: None,
112            field_data_iterator: None,
113            entry_data_iterator: None,
114            field_guard: None,
115            data_guard: None,
116            raw_payload_guard: None,
117        }
118    }
119}
120
121impl<'a, M: MemoryMap> JournalReader<'a, M> {
122    pub fn dump(&self, _journal_file: &'a JournalFile<M>) -> Result<String> {
123        if let Some(_filter_expr) = self.cursor.filter_expr.as_ref() {
124            Ok(String::from("filter expr active"))
125        } else {
126            Ok(String::from("no filter expr"))
127        }
128    }
129
130    pub fn set_location(&mut self, location: Location) {
131        self.cursor.set_location(location)
132    }
133
134    pub fn step(&mut self, journal_file: &'a JournalFile<M>, direction: Direction) -> Result<bool> {
135        self.drop_guards();
136
137        if let Some(filter) = self.filter.as_mut() {
138            let filter_expr = filter.build(journal_file)?;
139            self.cursor.set_filter(filter_expr);
140            self.filter = None;
141        }
142
143        self.cursor.step(journal_file, direction)
144    }
145
146    /// Build the pending filter expression (if any) and return it.
147    ///
148    /// After `add_match` / `add_disjunction` calls, the filter is stored inside
149    /// the reader in an unresolved form.  This method resolves it against
150    /// `journal_file`'s hash table and returns the resulting [`FilterExpr`].
151    /// On success, the internal pending filter is consumed; subsequent calls
152    /// return `Ok(None)` until new matches are added. If resolution fails, the
153    /// pending filter remains installed so the caller can retry or fall back.
154    ///
155    /// This is useful when the caller wants to drive iteration through
156    /// [`JournalCursor`] directly rather than through [`JournalReader::step`].
157    /// The returned filter is not installed on the reader cursor; callers that
158    /// need cursor-based iteration should set it on their own cursor.
159    pub fn build_filter(&mut self, journal_file: &JournalFile<M>) -> Result<Option<FilterExpr>> {
160        self.drop_guards();
161        if let Some(filter) = self.filter.as_mut() {
162            let expr = filter.build(journal_file)?;
163            self.filter = None;
164            Ok(Some(expr))
165        } else {
166            Ok(None)
167        }
168    }
169
170    pub fn add_match(&mut self, data: &[u8]) {
171        self.filter.get_or_insert_default().add_match(data);
172    }
173
174    pub fn add_conjunction(&mut self, journal_file: &'a JournalFile<M>) -> Result<()> {
175        self.filter
176            .get_or_insert_default()
177            .set_operation(journal_file, LogicalOp::Conjunction)
178    }
179
180    pub fn add_disjunction(&mut self, journal_file: &'a JournalFile<M>) -> Result<()> {
181        self.filter
182            .get_or_insert_default()
183            .set_operation(journal_file, LogicalOp::Disjunction)
184    }
185
186    pub fn flush_matches(&mut self) {
187        self.cursor.clear_filter();
188        self.filter = None;
189    }
190
191    pub fn get_realtime_usec(&self, journal_file: &'a JournalFile<M>) -> Result<u64> {
192        let entry_offset = self.cursor.position()?;
193        let entry_object = journal_file.entry_ref(entry_offset)?;
194        Ok(entry_object.header.realtime)
195    }
196
197    pub fn get_seqnum(&self, journal_file: &'a JournalFile<M>) -> Result<(u64, [u8; 16])> {
198        let entry_offset = self.cursor.position()?;
199        let entry_object = journal_file.entry_ref(entry_offset)?;
200        Ok((
201            entry_object.header.seqnum,
202            journal_file.journal_header_ref().seqnum_id,
203        ))
204    }
205
206    pub fn get_entry_offset(&self) -> Result<NonZeroU64> {
207        self.cursor.position()
208    }
209
210    fn drop_guards(&mut self) {
211        self.field_guard.take();
212        self.data_guard.take();
213        self.raw_payload_guard.take();
214    }
215
216    #[doc(hidden)]
217    pub fn release_object_guards(&mut self) {
218        self.drop_guards();
219    }
220
221    pub fn fields_restart(&mut self) {
222        self.drop_guards();
223        self.field_iterator = None;
224    }
225
226    pub fn fields_enumerate(
227        &mut self,
228        journal_file: &'a JournalFile<M>,
229    ) -> Result<Option<&ValueGuard<'_, FieldObject<&'a [u8]>>>> {
230        self.drop_guards();
231
232        if self.field_iterator.is_none() {
233            self.field_iterator = Some(journal_file.fields());
234        }
235
236        if let Some(iter) = &mut self.field_iterator {
237            self.field_guard = iter.next().transpose()?;
238            Ok(self.field_guard.as_ref())
239        } else {
240            Ok(None)
241        }
242    }
243
244    pub fn field_data_query_unique(
245        &mut self,
246        journal_file: &'a JournalFile<M>,
247        field_name: &'a [u8],
248    ) -> Result<()> {
249        self.drop_guards();
250
251        self.field_data_iterator = Some(journal_file.field_data_objects(field_name)?);
252        Ok(())
253    }
254
255    pub fn field_data_restart(&mut self) {
256        self.drop_guards();
257    }
258
259    pub fn field_data_enumerate(
260        &mut self,
261        _: &'a JournalFile<M>,
262    ) -> Result<Option<&ValueGuard<'_, DataObject<&'a [u8]>>>> {
263        self.drop_guards();
264
265        if let Some(iter) = &mut self.field_data_iterator {
266            self.data_guard = iter.next().transpose()?;
267            Ok(self.data_guard.as_ref())
268        } else {
269            Ok(None)
270        }
271    }
272
273    pub fn entry_data_restart(&mut self) {
274        self.drop_guards();
275        self.entry_data_iterator = None;
276    }
277
278    pub fn entry_data_enumerate(
279        &mut self,
280        journal_file: &'a JournalFile<M>,
281    ) -> Result<Option<&ValueGuard<'_, DataObject<&'a [u8]>>>> {
282        self.drop_guards();
283
284        if self.entry_data_iterator.is_none() {
285            let entry_offset = self.cursor.position()?;
286            self.entry_data_iterator = Some(journal_file.entry_data_objects(entry_offset)?);
287        }
288
289        if let Some(iter) = &mut self.entry_data_iterator {
290            self.data_guard = iter.next().transpose()?;
291            Ok(self.data_guard.as_ref())
292        } else {
293            Ok(None)
294        }
295    }
296
297    pub fn data_object_at(
298        &mut self,
299        journal_file: &'a JournalFile<M>,
300        data_offset: NonZeroU64,
301    ) -> Result<&ValueGuard<'_, DataObject<&'a [u8]>>> {
302        self.drop_guards();
303        self.data_guard = Some(journal_file.data_ref(data_offset)?);
304        Ok(self.data_guard.as_ref().expect("data guard is present"))
305    }
306
307    #[doc(hidden)]
308    pub fn raw_data_payload_at(
309        &mut self,
310        journal_file: &'a JournalFile<M>,
311        context: crate::file::file::DataPayloadReadContext,
312        info: crate::file::file::DataPayloadObjectInfo,
313        data_offset: NonZeroU64,
314    ) -> Result<&[u8]> {
315        self.drop_guards();
316        let guard = journal_file.raw_data_payload_ref_with_info(context, data_offset, info)?;
317        self.raw_payload_guard = Some(guard);
318        Ok(**self
319            .raw_payload_guard
320            .as_ref()
321            .expect("raw payload guard is present"))
322    }
323
324    pub fn entry_data_offsets(
325        &self,
326        journal_file: &'a JournalFile<M>,
327        data_offsets: &mut Vec<NonZeroU64>,
328    ) -> Result<()> {
329        let entry_offset = self.cursor.position()?;
330        let entry_guard = journal_file.entry_ref(entry_offset)?;
331
332        match &entry_guard.items {
333            EntryItemsType::Regular(items) => {
334                for item in items.iter() {
335                    if let Some(offset) = NonZeroU64::new(item.object_offset) {
336                        data_offsets.push(offset);
337                    }
338                }
339            }
340            EntryItemsType::Compact(items) => {
341                for item in items.iter() {
342                    if let Some(offset) = NonZeroU64::new(item.object_offset as u64) {
343                        data_offsets.push(offset);
344                    }
345                }
346            }
347        }
348
349        Ok(())
350    }
351}