1use super::mmap::MemoryMap;
2use crate::error::Result;
3use crate::file::{
4 EntryItemsType,
5 cursor::{JournalCursor, Location},
6 file::{EntryDataIterator, FieldDataIterator, FieldIterator, JournalFile},
7 filter::{FilterExpr, JournalFilter, LogicalOp},
8 object::{DataObject, FieldObject},
9 offset_array::Direction,
10 value_guard::ValueGuard,
11};
12use std::num::NonZeroU64;
13
14pub struct JournalReader<'a, M: MemoryMap> {
15 cursor: JournalCursor,
16
17 filter: Option<JournalFilter>,
18 field_iterator: Option<FieldIterator<'a, M>>,
19 field_data_iterator: Option<FieldDataIterator<'a, M>>,
20 entry_data_iterator: Option<EntryDataIterator<'a, M>>,
21
22 field_guard: Option<ValueGuard<'a, FieldObject<&'a [u8]>>>,
23 data_guard: Option<ValueGuard<'a, DataObject<&'a [u8]>>>,
24 raw_payload_guard: Option<ValueGuard<'a, &'a [u8]>>,
25}
26
27#[cfg(test)]
28mod tests {
29 use super::*;
30 use crate::file::{JournalFileOptions, JournalWriter, MmapMut};
31 use tempfile::TempDir;
32
33 fn test_uuid(seed: u8) -> uuid::Uuid {
34 uuid::Uuid::from_bytes([seed; 16])
35 }
36
37 fn create_test_journal() -> (TempDir, JournalFile<MmapMut>) {
38 let dir = TempDir::new().expect("create temp dir");
39 let journal_dir = dir.path().join("journals");
40 std::fs::create_dir_all(&journal_dir).expect("create journal dir");
41 let path = journal_dir.join("system.journal");
42 let repo_file =
43 crate::repository::File::from_path(&path).expect("test journal path should parse");
44
45 let mut journal_file = JournalFile::create(
46 &repo_file,
47 JournalFileOptions::new(test_uuid(1), test_uuid(2), test_uuid(3)),
48 )
49 .expect("create journal");
50 let mut writer =
51 JournalWriter::new(&mut journal_file, 1, test_uuid(4)).expect("create writer");
52 let payloads = [b"MESSAGE=test".as_slice(), b"PRIORITY=6".as_slice()];
53 writer
54 .add_entry(&mut journal_file, &payloads, 1_000_000, 100)
55 .expect("write entry");
56
57 (dir, journal_file)
58 }
59
60 #[test]
61 fn build_filter_returns_expr_and_consumes_pending_filter() {
62 let (_dir, journal_file) = create_test_journal();
63 let mut reader = JournalReader::<MmapMut>::default();
64 reader.add_match(b"MESSAGE=test");
65
66 let expr = reader
67 .build_filter(&journal_file)
68 .expect("build filter")
69 .expect("resolved filter expr");
70
71 assert!(!matches!(expr, FilterExpr::None));
72 assert!(reader.filter.is_none(), "pending filter should be consumed");
73 assert!(
74 reader
75 .build_filter(&journal_file)
76 .expect("second build")
77 .is_none()
78 );
79 }
80
81 #[test]
82 fn build_filter_failure_keeps_pending_filter() {
83 let (_dir, journal_file) = create_test_journal();
84 let mut reader = JournalReader::<MmapMut>::default();
85 reader.filter = Some(JournalFilter::default());
86
87 assert!(reader.build_filter(&journal_file).is_err());
88 assert!(
89 reader.filter.is_some(),
90 "pending filter should remain after build failure"
91 );
92 assert!(reader.build_filter(&journal_file).is_err());
93 }
94}
95
96impl<M: MemoryMap> std::fmt::Debug for JournalReader<'_, M> {
97 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98 f.debug_struct("JournalReader")
99 .field("field_guard", &self.field_guard)
101 .field("data_guard", &self.data_guard)
102 .finish()
103 }
104}
105
106impl<M: MemoryMap> Default for JournalReader<'_, M> {
107 fn default() -> Self {
108 Self {
109 cursor: JournalCursor::new(),
110 filter: None,
111 field_iterator: None,
112 field_data_iterator: None,
113 entry_data_iterator: None,
114 field_guard: None,
115 data_guard: None,
116 raw_payload_guard: None,
117 }
118 }
119}
120
121impl<'a, M: MemoryMap> JournalReader<'a, M> {
122 pub fn dump(&self, _journal_file: &'a JournalFile<M>) -> Result<String> {
123 if let Some(_filter_expr) = self.cursor.filter_expr.as_ref() {
124 Ok(String::from("filter expr active"))
125 } else {
126 Ok(String::from("no filter expr"))
127 }
128 }
129
130 pub fn set_location(&mut self, location: Location) {
131 self.cursor.set_location(location)
132 }
133
134 pub fn step(&mut self, journal_file: &'a JournalFile<M>, direction: Direction) -> Result<bool> {
135 self.drop_guards();
136
137 if let Some(filter) = self.filter.as_mut() {
138 let filter_expr = filter.build(journal_file)?;
139 self.cursor.set_filter(filter_expr);
140 self.filter = None;
141 }
142
143 self.cursor.step(journal_file, direction)
144 }
145
146 pub fn build_filter(&mut self, journal_file: &JournalFile<M>) -> Result<Option<FilterExpr>> {
160 self.drop_guards();
161 if let Some(filter) = self.filter.as_mut() {
162 let expr = filter.build(journal_file)?;
163 self.filter = None;
164 Ok(Some(expr))
165 } else {
166 Ok(None)
167 }
168 }
169
170 pub fn add_match(&mut self, data: &[u8]) {
171 self.filter.get_or_insert_default().add_match(data);
172 }
173
174 pub fn add_conjunction(&mut self, journal_file: &'a JournalFile<M>) -> Result<()> {
175 self.filter
176 .get_or_insert_default()
177 .set_operation(journal_file, LogicalOp::Conjunction)
178 }
179
180 pub fn add_disjunction(&mut self, journal_file: &'a JournalFile<M>) -> Result<()> {
181 self.filter
182 .get_or_insert_default()
183 .set_operation(journal_file, LogicalOp::Disjunction)
184 }
185
186 pub fn flush_matches(&mut self) {
187 self.cursor.clear_filter();
188 self.filter = None;
189 }
190
191 pub fn get_realtime_usec(&self, journal_file: &'a JournalFile<M>) -> Result<u64> {
192 let entry_offset = self.cursor.position()?;
193 let entry_object = journal_file.entry_ref(entry_offset)?;
194 Ok(entry_object.header.realtime)
195 }
196
197 pub fn get_seqnum(&self, journal_file: &'a JournalFile<M>) -> Result<(u64, [u8; 16])> {
198 let entry_offset = self.cursor.position()?;
199 let entry_object = journal_file.entry_ref(entry_offset)?;
200 Ok((
201 entry_object.header.seqnum,
202 journal_file.journal_header_ref().seqnum_id,
203 ))
204 }
205
206 pub fn get_entry_offset(&self) -> Result<NonZeroU64> {
207 self.cursor.position()
208 }
209
210 fn drop_guards(&mut self) {
211 self.field_guard.take();
212 self.data_guard.take();
213 self.raw_payload_guard.take();
214 }
215
216 #[doc(hidden)]
217 pub fn release_object_guards(&mut self) {
218 self.drop_guards();
219 }
220
221 pub fn fields_restart(&mut self) {
222 self.drop_guards();
223 self.field_iterator = None;
224 }
225
226 pub fn fields_enumerate(
227 &mut self,
228 journal_file: &'a JournalFile<M>,
229 ) -> Result<Option<&ValueGuard<'_, FieldObject<&'a [u8]>>>> {
230 self.drop_guards();
231
232 if self.field_iterator.is_none() {
233 self.field_iterator = Some(journal_file.fields());
234 }
235
236 if let Some(iter) = &mut self.field_iterator {
237 self.field_guard = iter.next().transpose()?;
238 Ok(self.field_guard.as_ref())
239 } else {
240 Ok(None)
241 }
242 }
243
244 pub fn field_data_query_unique(
245 &mut self,
246 journal_file: &'a JournalFile<M>,
247 field_name: &'a [u8],
248 ) -> Result<()> {
249 self.drop_guards();
250
251 self.field_data_iterator = Some(journal_file.field_data_objects(field_name)?);
252 Ok(())
253 }
254
255 pub fn field_data_restart(&mut self) {
256 self.drop_guards();
257 }
258
259 pub fn field_data_enumerate(
260 &mut self,
261 _: &'a JournalFile<M>,
262 ) -> Result<Option<&ValueGuard<'_, DataObject<&'a [u8]>>>> {
263 self.drop_guards();
264
265 if let Some(iter) = &mut self.field_data_iterator {
266 self.data_guard = iter.next().transpose()?;
267 Ok(self.data_guard.as_ref())
268 } else {
269 Ok(None)
270 }
271 }
272
273 pub fn entry_data_restart(&mut self) {
274 self.drop_guards();
275 self.entry_data_iterator = None;
276 }
277
278 pub fn entry_data_enumerate(
279 &mut self,
280 journal_file: &'a JournalFile<M>,
281 ) -> Result<Option<&ValueGuard<'_, DataObject<&'a [u8]>>>> {
282 self.drop_guards();
283
284 if self.entry_data_iterator.is_none() {
285 let entry_offset = self.cursor.position()?;
286 self.entry_data_iterator = Some(journal_file.entry_data_objects(entry_offset)?);
287 }
288
289 if let Some(iter) = &mut self.entry_data_iterator {
290 self.data_guard = iter.next().transpose()?;
291 Ok(self.data_guard.as_ref())
292 } else {
293 Ok(None)
294 }
295 }
296
297 pub fn data_object_at(
298 &mut self,
299 journal_file: &'a JournalFile<M>,
300 data_offset: NonZeroU64,
301 ) -> Result<&ValueGuard<'_, DataObject<&'a [u8]>>> {
302 self.drop_guards();
303 self.data_guard = Some(journal_file.data_ref(data_offset)?);
304 Ok(self.data_guard.as_ref().expect("data guard is present"))
305 }
306
307 #[doc(hidden)]
308 pub fn raw_data_payload_at(
309 &mut self,
310 journal_file: &'a JournalFile<M>,
311 context: crate::file::file::DataPayloadReadContext,
312 info: crate::file::file::DataPayloadObjectInfo,
313 data_offset: NonZeroU64,
314 ) -> Result<&[u8]> {
315 self.drop_guards();
316 let guard = journal_file.raw_data_payload_ref_with_info(context, data_offset, info)?;
317 self.raw_payload_guard = Some(guard);
318 Ok(**self
319 .raw_payload_guard
320 .as_ref()
321 .expect("raw payload guard is present"))
322 }
323
324 pub fn entry_data_offsets(
325 &self,
326 journal_file: &'a JournalFile<M>,
327 data_offsets: &mut Vec<NonZeroU64>,
328 ) -> Result<()> {
329 let entry_offset = self.cursor.position()?;
330 let entry_guard = journal_file.entry_ref(entry_offset)?;
331
332 match &entry_guard.items {
333 EntryItemsType::Regular(items) => {
334 for item in items.iter() {
335 if let Some(offset) = NonZeroU64::new(item.object_offset) {
336 data_offsets.push(offset);
337 }
338 }
339 }
340 EntryItemsType::Compact(items) => {
341 for item in items.iter() {
342 if let Some(offset) = NonZeroU64::new(item.object_offset as u64) {
343 data_offsets.push(offset);
344 }
345 }
346 }
347 }
348
349 Ok(())
350 }
351}