systemd_journal_reader/
systemd-journal-reader.rs1use std::collections::HashMap;
11use std::convert::TryInto;
12use std::io::{self, Read};
13use std::rc::Rc;
14
15const SIGNATURE: &[u8; 8] = b"LPKSHHRH";
17const HEADER_SIZE_MIN: u64 = 240;
18
19const OBJECT_DATA: u8 = 1;
20const OBJECT_ENTRY: u8 = 3;
21
22const HEADER_INCOMPATIBLE_COMPRESSED_ZSTD: u32 = 1 << 3;
23const HEADER_INCOMPATIBLE_COMPACT: u32 = 1 << 4;
24const OBJECT_COMPRESSED_ZSTD: u8 = 1 << 2;
25
26fn read_u64<R: Read>(reader: &mut R) -> io::Result<u64> {
28 let mut buf = [0u8; 8];
29 reader.read_exact(&mut buf)?;
30 Ok(u64::from_le_bytes(buf))
31}
32
33#[derive(Debug, Clone)]
35struct Header {
36 signature: [u8; 8],
37 incompatible_flags: u32,
38 header_size: u64,
39 arena_size: u64,
40}
41
42fn slice2io(e: std::array::TryFromSliceError) -> io::Error {
43 std::io::Error::new(std::io::ErrorKind::Other, e)
44}
45
46impl Header {
47 fn read_from<R: Read>(reader: &mut R) -> io::Result<Self> {
49 let mut buf = [0u8; 240]; reader.read_exact(&mut buf)?;
51
52 let signature: [u8; 8] = buf[0..8].try_into().map_err(slice2io)?;
53 let incompatible_flags = u32::from_le_bytes(buf[12..16].try_into().map_err(slice2io)?);
54 let header_size = u64::from_le_bytes(buf[88..96].try_into().map_err(slice2io)?);
55 let arena_size = u64::from_le_bytes(buf[96..104].try_into().map_err(slice2io)?);
56
57 Ok(Header {
58 signature,
59 incompatible_flags,
60 header_size,
61 arena_size,
62 })
63 }
64}
65
66#[derive(Debug, Clone)]
68struct ObjectHeader {
69 object_type: u8,
70 flags: u8,
71 size: u64,
72}
73
74impl ObjectHeader {
75 fn read_from<R: Read>(reader: &mut R) -> io::Result<Self> {
77 let mut buf = [0u8; 16];
78 reader.read_exact(&mut buf)?;
79 Ok(ObjectHeader {
80 object_type: buf[0],
81 flags: buf[1],
82 size: u64::from_le_bytes(buf[8..16].try_into().map_err(slice2io)?),
83 })
84 }
85}
86
87#[derive(Debug, Clone)]
89struct EntryObject {
90 realtime: u64,
91}
92
93impl EntryObject {
94 fn read_from<R: Read>(reader: &mut R) -> io::Result<Self> {
96 let _ = read_u64(reader)?;
99 let realtime = read_u64(reader)?;
101 let _ = read_u64(reader)?;
103 let mut boot_id_buf = [0u8; 16];
105 reader.read_exact(&mut boot_id_buf)?;
106 let _ = read_u64(reader)?;
108
109 Ok(EntryObject { realtime })
110 }
111}
112
113#[derive(Debug)]
115pub struct Entry {
116 pub realtime: u64,
118 pub fields: HashMap<Rc<str>, Rc<str>>,
120}
121
122pub struct JournalReader<R: Read> {
124 reader: std::io::BufReader<R>,
125 header: Header,
126 current_offset: u64,
127 data_object_cache: HashMap<u64, (Rc<str>, Rc<str>)>,
128}
129
130impl<R: Read> JournalReader<R> {
131 pub fn new(mut file: R) -> io::Result<JournalReader<R>> {
133 let header = Header::read_from(&mut file)?;
134
135 if &header.signature != SIGNATURE {
136 return Err(io::Error::new(
137 io::ErrorKind::InvalidData,
138 "Invalid journal file signature",
139 ));
140 }
141
142 if header.header_size < HEADER_SIZE_MIN {
143 return Err(io::Error::new(
144 io::ErrorKind::InvalidData,
145 "Header size is too small",
146 ));
147 }
148
149 if header.header_size > HEADER_SIZE_MIN {
151 let to_discard = header.header_size - HEADER_SIZE_MIN;
152 io::copy(&mut (&mut file).take(to_discard), &mut io::sink())?;
153 }
154
155 let current_offset = header.header_size;
156
157 Ok(JournalReader {
158 reader: std::io::BufReader::new(file),
159 header,
160 current_offset,
161 data_object_cache: HashMap::new(),
162 })
163 }
164
165 pub fn next_entry(&mut self) -> Option<Entry> {
169 while self.current_offset < self.header.header_size + self.header.arena_size {
170 let object_start_offset = self.current_offset;
171
172 let object_header = match ObjectHeader::read_from(&mut self.reader) {
173 Ok(h) => h,
174 Err(_) => return None, };
176
177 let object_header_size = 16u64;
178 let payload_size = object_header.size.saturating_sub(object_header_size);
179
180 let entry = match object_header.object_type {
181 OBJECT_ENTRY => {
182 let entry_map = self.parse_entry_object_payload(payload_size).ok()?;
183 Some(entry_map)
184 }
185 OBJECT_DATA => {
186 if let Some(data_map) =
187 self.parse_data_object_payload(object_header.flags, payload_size)
188 {
189 self.data_object_cache.insert(object_start_offset, data_map);
190 };
191 None
192 }
193 _ => {
194 io::copy(&mut (&mut self.reader).take(payload_size), &mut io::sink()).ok()?;
196 None
197 }
198 };
199
200 let padded_size = (object_header.size + 7) & !7;
201 let padding = padded_size - object_header.size;
202 if padding > 0 {
203 io::copy(&mut (&mut self.reader).take(padding), &mut io::sink()).ok()?;
204 }
205 self.current_offset = object_start_offset + padded_size;
206 if entry.is_some() {
207 return entry;
208 }
209 }
210 None
211 }
212
213 fn parse_data_object_payload(
215 &mut self,
216 flags: u8,
217 payload_size: u64,
218 ) -> Option<(Rc<str>, Rc<str>)> {
219 let is_compact = (self.header.incompatible_flags & HEADER_INCOMPATIBLE_COMPACT) != 0;
220
221 let mut data_object_fixed_size = 8 * 6;
225 if is_compact {
226 data_object_fixed_size += 4 + 4; }
228
229 if payload_size < data_object_fixed_size {
230 io::copy(&mut (&mut self.reader).take(payload_size), &mut io::sink()).ok()?;
231 return None;
232 }
233 io::copy(
234 &mut (&mut self.reader).take(data_object_fixed_size),
235 &mut io::sink(),
236 )
237 .ok()?;
238
239 let data_payload_size = payload_size - data_object_fixed_size;
240 let mut payload_buf = vec![0u8; data_payload_size as usize];
241 self.reader.read_exact(&mut payload_buf).ok()?;
242
243 let final_payload =
244 if (self.header.incompatible_flags & HEADER_INCOMPATIBLE_COMPRESSED_ZSTD != 0)
245 && (flags & OBJECT_COMPRESSED_ZSTD != 0)
246 {
247 zstd::decode_all(payload_buf.as_slice()).unwrap_or_default()
248 } else {
249 payload_buf
250 };
251
252 let data_str = String::from_utf8_lossy(&final_payload);
253 let mut parts = data_str.splitn(2, '=');
254 let key = parts.next()?;
255 let value = parts.next().unwrap_or("");
256
257 Some((key.into(), value.into()))
258 }
259
260 fn parse_entry_object_payload(&mut self, payload_size: u64) -> io::Result<Entry> {
262 let entry_object = EntryObject::read_from(&mut self.reader)?;
263
264 let mut fields = HashMap::new();
265
266 let entry_object_fixed_size = 8 + 8 + 8 + 16 + 8;
267 let mut items_payload_size = payload_size.saturating_sub(entry_object_fixed_size);
268
269 let is_compact = (self.header.incompatible_flags & HEADER_INCOMPATIBLE_COMPACT) != 0;
270 let item_size = if is_compact { 4 } else { 16 };
271
272 while items_payload_size >= item_size {
273 let data_object_offset = if is_compact {
274 let mut buf = [0u8; 4];
275 self.reader.read_exact(&mut buf)?;
276 u32::from_le_bytes(buf) as u64
277 } else {
278 read_u64(&mut self.reader)?
279 };
280
281 if !is_compact {
282 let _ = read_u64(&mut self.reader)?;
284 }
285
286 if let Some((k, v)) = self.data_object_cache.get(&data_object_offset) {
287 fields.insert(k.clone(), v.clone());
288 }
289 items_payload_size -= item_size;
290 }
291
292 if items_payload_size > 0 {
294 io::copy(
295 &mut (&mut self.reader).take(items_payload_size),
296 &mut io::sink(),
297 )?;
298 }
299
300 Ok(Entry {
301 realtime: entry_object.realtime,
302 fields,
303 })
304 }
305}