systemd_journal_reader/
systemd-journal-reader.rs

1//! A library for reading systemd journal files in a streaming fashion.
2//!
3//! This crate provides a `JournalReader` that can be used to parse
4//! journal files and iterate over their entries. It is designed to be
5//! memory-efficient and safe, processing one entry at a time without
6//! loading the entire file into memory or using unsafe code. This version
7//! is designed for Read-only streams (like network sockets) and does not
8//! require the Seek trait.
9
10use std::collections::HashMap;
11use std::convert::TryInto;
12use std::io::{self, Read};
13use std::rc::Rc;
14
15// Constants from the systemd journal file format specification.
16const SIGNATURE: &[u8; 8] = b"LPKSHHRH";
17const HEADER_SIZE_MIN: u64 = 240;
18
19const OBJECT_DATA: u8 = 1;
20const OBJECT_ENTRY: u8 = 3;
21
22const HEADER_INCOMPATIBLE_COMPRESSED_ZSTD: u32 = 1 << 3;
23const HEADER_INCOMPATIBLE_COMPACT: u32 = 1 << 4;
24const OBJECT_COMPRESSED_ZSTD: u8 = 1 << 2;
25
26/// A helper function to read a u64 from a reader.
27fn read_u64<R: Read>(reader: &mut R) -> io::Result<u64> {
28    let mut buf = [0u8; 8];
29    reader.read_exact(&mut buf)?;
30    Ok(u64::from_le_bytes(buf))
31}
32
33/// Represents the header of a systemd journal file.
34#[derive(Debug, Clone)]
35struct Header {
36    signature: [u8; 8],
37    incompatible_flags: u32,
38    header_size: u64,
39    arena_size: u64,
40}
41
42fn slice2io(e: std::array::TryFromSliceError) -> io::Error {
43    std::io::Error::new(std::io::ErrorKind::Other, e)
44}
45
46impl Header {
47    /// Parses the journal header from a reader.
48    fn read_from<R: Read>(reader: &mut R) -> io::Result<Self> {
49        let mut buf = [0u8; 240]; // Read the minimum header size
50        reader.read_exact(&mut buf)?;
51
52        let signature: [u8; 8] = buf[0..8].try_into().map_err(slice2io)?;
53        let incompatible_flags = u32::from_le_bytes(buf[12..16].try_into().map_err(slice2io)?);
54        let header_size = u64::from_le_bytes(buf[88..96].try_into().map_err(slice2io)?);
55        let arena_size = u64::from_le_bytes(buf[96..104].try_into().map_err(slice2io)?);
56
57        Ok(Header {
58            signature,
59            incompatible_flags,
60            header_size,
61            arena_size,
62        })
63    }
64}
65
66/// Represents the header of an object within the journal file.
67#[derive(Debug, Clone)]
68struct ObjectHeader {
69    object_type: u8,
70    flags: u8,
71    size: u64,
72}
73
74impl ObjectHeader {
75    /// Parses an object header from a reader.
76    fn read_from<R: Read>(reader: &mut R) -> io::Result<Self> {
77        let mut buf = [0u8; 16];
78        reader.read_exact(&mut buf)?;
79        Ok(ObjectHeader {
80            object_type: buf[0],
81            flags: buf[1],
82            size: u64::from_le_bytes(buf[8..16].try_into().map_err(slice2io)?),
83        })
84    }
85}
86
87/// Represents an entry object, which ties together multiple data objects.
88#[derive(Debug, Clone)]
89struct EntryObject {
90    realtime: u64,
91}
92
93impl EntryObject {
94    /// Parses an entry object's fixed fields from a reader.
95    fn read_from<R: Read>(reader: &mut R) -> io::Result<Self> {
96        // We only care about the realtime timestamp for this implementation.
97        // seqnum: u64
98        let _ = read_u64(reader)?;
99        // realtime: u64
100        let realtime = read_u64(reader)?;
101        // monotonic: u64
102        let _ = read_u64(reader)?;
103        // boot_id: [u8; 16]
104        let mut boot_id_buf = [0u8; 16];
105        reader.read_exact(&mut boot_id_buf)?;
106        // xor_hash: u64
107        let _ = read_u64(reader)?;
108
109        Ok(EntryObject { realtime })
110    }
111}
112
113/// A journal entry.
114#[derive(Debug)]
115pub struct Entry {
116    /// The __REALTIME_TIMESTAMP value.
117    pub realtime: u64,
118    /// The entry fields.
119    pub fields: HashMap<Rc<str>, Rc<str>>,
120}
121
122/// Reads systemd journal files in a streaming manner from a Read-only source.
123pub struct JournalReader<R: Read> {
124    reader: std::io::BufReader<R>,
125    header: Header,
126    current_offset: u64,
127    data_object_cache: HashMap<u64, (Rc<str>, Rc<str>)>,
128}
129
130impl<R: Read> JournalReader<R> {
131    /// Creates a new `JournalReader` from a readable source.
132    pub fn new(mut file: R) -> io::Result<JournalReader<R>> {
133        let header = Header::read_from(&mut file)?;
134
135        if &header.signature != SIGNATURE {
136            return Err(io::Error::new(
137                io::ErrorKind::InvalidData,
138                "Invalid journal file signature",
139            ));
140        }
141
142        if header.header_size < HEADER_SIZE_MIN {
143            return Err(io::Error::new(
144                io::ErrorKind::InvalidData,
145                "Header size is too small",
146            ));
147        }
148
149        // Discard the rest of the header if it's larger than the minimum
150        if header.header_size > HEADER_SIZE_MIN {
151            let to_discard = header.header_size - HEADER_SIZE_MIN;
152            io::copy(&mut (&mut file).take(to_discard), &mut io::sink())?;
153        }
154
155        let current_offset = header.header_size;
156
157        Ok(JournalReader {
158            reader: std::io::BufReader::new(file),
159            header,
160            current_offset,
161            data_object_cache: HashMap::new(),
162        })
163    }
164
165    /// Reads the next log entry from the journal stream.
166    /// Note: This method buffers data objects in memory. For very large journal
167    /// files without frequent entries, memory usage can grow.
168    pub fn next_entry(&mut self) -> Option<Entry> {
169        while self.current_offset < self.header.header_size + self.header.arena_size {
170            let object_start_offset = self.current_offset;
171
172            let object_header = match ObjectHeader::read_from(&mut self.reader) {
173                Ok(h) => h,
174                Err(_) => return None, // End of file or read error
175            };
176
177            let object_header_size = 16u64;
178            let payload_size = object_header.size.saturating_sub(object_header_size);
179
180            let entry = match object_header.object_type {
181                OBJECT_ENTRY => {
182                    let entry_map = self.parse_entry_object_payload(payload_size).ok()?;
183                    Some(entry_map)
184                }
185                OBJECT_DATA => {
186                    if let Some(data_map) =
187                        self.parse_data_object_payload(object_header.flags, payload_size)
188                    {
189                        self.data_object_cache.insert(object_start_offset, data_map);
190                    };
191                    None
192                }
193                _ => {
194                    // Skip other object types by discarding their payload
195                    io::copy(&mut (&mut self.reader).take(payload_size), &mut io::sink()).ok()?;
196                    None
197                }
198            };
199
200            let padded_size = (object_header.size + 7) & !7;
201            let padding = padded_size - object_header.size;
202            if padding > 0 {
203                io::copy(&mut (&mut self.reader).take(padding), &mut io::sink()).ok()?;
204            }
205            self.current_offset = object_start_offset + padded_size;
206            if entry.is_some() {
207                return entry;
208            }
209        }
210        None
211    }
212
213    /// Parses the payload of a data object, returning the contained fields.
214    fn parse_data_object_payload(
215        &mut self,
216        flags: u8,
217        payload_size: u64,
218    ) -> Option<(Rc<str>, Rc<str>)> {
219        let is_compact = (self.header.incompatible_flags & HEADER_INCOMPATIBLE_COMPACT) != 0;
220
221        // The fixed fields of DataObject are part of the payload now.
222        // We must read and discard them to get to the actual data.
223        // hash, next_hash_offset, next_field_offset, entry_offset, entry_array_offset, n_entries
224        let mut data_object_fixed_size = 8 * 6;
225        if is_compact {
226            data_object_fixed_size += 4 + 4; // tail_entry_array_offset + tail_entry_array_n_entries
227        }
228
229        if payload_size < data_object_fixed_size {
230            io::copy(&mut (&mut self.reader).take(payload_size), &mut io::sink()).ok()?;
231            return None;
232        }
233        io::copy(
234            &mut (&mut self.reader).take(data_object_fixed_size),
235            &mut io::sink(),
236        )
237        .ok()?;
238
239        let data_payload_size = payload_size - data_object_fixed_size;
240        let mut payload_buf = vec![0u8; data_payload_size as usize];
241        self.reader.read_exact(&mut payload_buf).ok()?;
242
243        let final_payload =
244            if (self.header.incompatible_flags & HEADER_INCOMPATIBLE_COMPRESSED_ZSTD != 0)
245                && (flags & OBJECT_COMPRESSED_ZSTD != 0)
246            {
247                zstd::decode_all(payload_buf.as_slice()).unwrap_or_default()
248            } else {
249                payload_buf
250            };
251
252        let data_str = String::from_utf8_lossy(&final_payload);
253        let mut parts = data_str.splitn(2, '=');
254        let key = parts.next()?;
255        let value = parts.next().unwrap_or("");
256
257        Some((key.into(), value.into()))
258    }
259
260    /// Parses the payload of an entry object, constructing the entry map from the cache.
261    fn parse_entry_object_payload(&mut self, payload_size: u64) -> io::Result<Entry> {
262        let entry_object = EntryObject::read_from(&mut self.reader)?;
263
264        let mut fields = HashMap::new();
265
266        let entry_object_fixed_size = 8 + 8 + 8 + 16 + 8;
267        let mut items_payload_size = payload_size.saturating_sub(entry_object_fixed_size);
268
269        let is_compact = (self.header.incompatible_flags & HEADER_INCOMPATIBLE_COMPACT) != 0;
270        let item_size = if is_compact { 4 } else { 16 };
271
272        while items_payload_size >= item_size {
273            let data_object_offset = if is_compact {
274                let mut buf = [0u8; 4];
275                self.reader.read_exact(&mut buf)?;
276                u32::from_le_bytes(buf) as u64
277            } else {
278                read_u64(&mut self.reader)?
279            };
280
281            if !is_compact {
282                // Skip the hash
283                let _ = read_u64(&mut self.reader)?;
284            }
285
286            if let Some((k, v)) = self.data_object_cache.get(&data_object_offset) {
287                fields.insert(k.clone(), v.clone());
288            }
289            items_payload_size -= item_size;
290        }
291
292        // Skip any remaining padding in the entry object payload
293        if items_payload_size > 0 {
294            io::copy(
295                &mut (&mut self.reader).take(items_payload_size),
296                &mut io::sink(),
297            )?;
298        }
299
300        Ok(Entry {
301            realtime: entry_object.realtime,
302            fields,
303        })
304    }
305}