use std::{
collections::HashMap,
fs::{File, OpenOptions},
io::{self, Read as _, Seek, SeekFrom, Write},
path::Path,
sync::{
atomic::{AtomicU8, Ordering},
Arc,
},
time::{SystemTime, UNIX_EPOCH},
};
use uuid::Uuid;
use crate::{
def::*,
error::{Error, Result},
hash::hash64,
};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Compression {
None,
#[cfg(feature = "xz-compression")]
Xz,
#[cfg(feature = "lz4-compression")]
Lz4,
#[cfg(feature = "zstd-compression")]
Zstd,
}
pub fn compression_default() -> Compression {
#[cfg(feature = "zstd-compression")]
{
return Compression::Zstd;
}
#[cfg(feature = "lz4-compression")]
{
return Compression::Lz4;
}
#[cfg(feature = "xz-compression")]
{
return Compression::Xz;
}
#[allow(unreachable_code)]
Compression::None
}
pub fn compression_requested() -> Compression {
let val = match std::env::var("SYSTEMD_JOURNAL_COMPRESS") {
Ok(v) => v,
Err(_) => return compression_default(),
};
match val.to_ascii_lowercase().as_str() {
"0" | "false" | "no" => Compression::None,
"1" | "true" | "yes" => compression_default(),
#[cfg(feature = "xz-compression")]
"xz" => Compression::Xz,
#[cfg(feature = "lz4-compression")]
"lz4" => Compression::Lz4,
#[cfg(feature = "zstd-compression")]
"zstd" => Compression::Zstd,
_ => compression_default(),
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[allow(dead_code)]
#[repr(u8)]
enum OfflineState {
Joined = 0,
Syncing = 1,
Offlining = 2,
Cancel = 3,
AgainFromSyncing = 4,
AgainFromOfflining = 5,
Done = 6,
}
impl OfflineState {
#[allow(dead_code)]
fn from_u8(v: u8) -> Self {
match v {
0 => Self::Joined,
1 => Self::Syncing,
2 => Self::Offlining,
3 => Self::Cancel,
4 => Self::AgainFromSyncing,
5 => Self::AgainFromOfflining,
6 => Self::Done,
_ => Self::Joined,
}
}
}
type PostChangeCallback = Box<dyn FnMut() + Send>;
const HASH_CHAIN_DEPTH_MAX: u64 = 100;
const DEFAULT_COMPRESS_THRESHOLD: u64 = 512;
const MIN_COMPRESS_THRESHOLD: u64 = 8;
const JOURNAL_FILE_SIZE_MIN: u64 = 512 * 1024;
#[derive(Debug, Clone, PartialEq)]
pub struct JournalMetrics {
pub max_size: u64,
pub min_size: u64,
pub max_use: u64,
pub min_use: u64,
pub keep_free: u64,
pub n_max_files: u64,
}
impl Default for JournalMetrics {
fn default() -> Self {
Self {
max_size: u64::MAX, min_size: u64::MAX,
max_use: u64::MAX,
min_use: u64::MAX,
keep_free: u64::MAX,
n_max_files: u64::MAX,
}
}
}
pub fn journal_reset_metrics(m: &mut JournalMetrics) {
*m = JournalMetrics::default();
}
pub fn journal_metrics_equal(x: &JournalMetrics, y: &JournalMetrics) -> bool {
x == y
}
fn get_boot_id() -> [u8; 16] {
#[cfg(target_os = "linux")]
if let Ok(s) = std::fs::read_to_string("/proc/sys/kernel/random/boot_id") {
if let Ok(id) = Uuid::parse_str(s.trim()) {
return *id.as_bytes();
}
}
*Uuid::new_v4().as_bytes()
}
fn realtime_now() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_micros() as u64)
.unwrap_or(0)
}
fn monotonic_now() -> u64 {
#[cfg(target_os = "linux")]
{
let mut ts = libc::timespec { tv_sec: 0, tv_nsec: 0 };
unsafe { libc::clock_gettime(libc::CLOCK_MONOTONIC, &mut ts) };
(ts.tv_sec as u64) * 1_000_000 + (ts.tv_nsec as u64) / 1_000
}
#[cfg(not(target_os = "linux"))]
{
realtime_now()
}
}
fn machine_id() -> [u8; 16] {
#[cfg(target_os = "linux")]
if let Ok(s) = std::fs::read_to_string("/etc/machine-id") {
if let Ok(id) = Uuid::parse_str(s.trim()) {
return *id.as_bytes();
}
}
*Uuid::new_v4().as_bytes()
}
type DataTailCache = HashMap<u64, (u64, u64)>;
pub fn offset_is_valid(offset: u64, header_size: u64, tail_object_offset: u64) -> bool {
if offset == 0 {
return true;
}
if !valid64(offset) {
return false;
}
if offset < header_size {
return false;
}
if offset > tail_object_offset {
return false;
}
true
}
pub fn minimum_header_size(obj_type: ObjectType, compact: bool) -> u64 {
match obj_type {
ObjectType::Data => data_payload_offset(compact),
ObjectType::Field => FIELD_OBJECT_HEADER_SIZE as u64,
ObjectType::Entry => ENTRY_OBJECT_HEADER_SIZE as u64,
ObjectType::DataHashTable | ObjectType::FieldHashTable => OBJECT_HEADER_SIZE as u64,
ObjectType::EntryArray => ENTRY_ARRAY_OBJECT_HEADER_SIZE as u64,
ObjectType::Tag => {
OBJECT_HEADER_SIZE as u64 + 8 + 8 + 32 }
ObjectType::Unused => OBJECT_HEADER_SIZE as u64,
}
}
pub fn check_object_header(obj_type: u8, obj_size: u64, offset: u64, compact: bool, expected_type: Option<ObjectType>) -> Result<()> {
if obj_size == 0 {
return Err(Error::CorruptObject {
offset,
reason: "attempt to move to uninitialized object".into(),
});
}
if obj_type == 0 || obj_type > ObjectType::Tag as u8 {
return Err(Error::CorruptObject {
offset,
reason: format!("invalid object type {}", obj_type),
});
}
let otype = ObjectType::try_from(obj_type).map_err(|_| Error::CorruptObject {
offset,
reason: format!("invalid object type {}", obj_type),
})?;
if let Some(expected) = expected_type {
if expected != ObjectType::Unused && otype != expected {
return Err(Error::CorruptObject {
offset,
reason: format!(
"expected {:?} object, got {:?}",
expected, otype
),
});
}
}
let min_size = minimum_header_size(otype, compact);
if obj_size < min_size {
return Err(Error::CorruptObject {
offset,
reason: format!(
"object size {} too small for type {:?} (minimum {})",
obj_size, otype, min_size
),
});
}
if obj_size < OBJECT_HEADER_SIZE as u64 {
return Err(Error::CorruptObject {
offset,
reason: format!(
"object size {} less than header size {}",
obj_size, OBJECT_HEADER_SIZE
),
});
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub fn check_object(
obj_type: ObjectType,
obj_size: u64,
flags: u8,
offset: u64,
compact: bool,
_data_hash: u64,
data_next_hash_offset: u64,
data_next_field_offset: u64,
data_entry_offset: u64,
data_entry_array_offset: u64,
data_n_entries: u64,
entry_seqnum: u64,
entry_realtime: u64,
entry_monotonic: u64,
entry_boot_id: &[u8; 16],
entry_array_next: u64,
tag_epoch: u64,
) -> Result<()> {
check_object_header(obj_type as u8, obj_size, offset, compact, None)?;
match obj_type {
ObjectType::Data => {
if obj_size <= data_payload_offset(compact) {
return Err(Error::CorruptObject {
offset,
reason: "DATA object has no payload".into(),
});
}
if (data_entry_offset == 0) != (data_n_entries == 0) {
return Err(Error::CorruptObject {
offset,
reason: format!(
"DATA entry_offset={} but n_entries={}",
data_entry_offset, data_n_entries
),
});
}
if data_next_hash_offset != 0 && !valid64(data_next_hash_offset) {
return Err(Error::CorruptObject {
offset,
reason: "DATA next_hash_offset not aligned".into(),
});
}
if data_next_field_offset != 0 && !valid64(data_next_field_offset) {
return Err(Error::CorruptObject {
offset,
reason: "DATA next_field_offset not aligned".into(),
});
}
if data_entry_offset != 0 && !valid64(data_entry_offset) {
return Err(Error::CorruptObject {
offset,
reason: "DATA entry_offset not aligned".into(),
});
}
if data_entry_array_offset != 0 && !valid64(data_entry_array_offset) {
return Err(Error::CorruptObject {
offset,
reason: "DATA entry_array_offset not aligned".into(),
});
}
let compressed = flags & obj_flags::COMPRESSED_MASK;
if compressed != 0 && compressed.count_ones() > 1 {
return Err(Error::CorruptObject {
offset,
reason: "DATA has multiple compression flags set".into(),
});
}
}
ObjectType::Field => {
if obj_size <= FIELD_OBJECT_HEADER_SIZE as u64 {
return Err(Error::CorruptObject {
offset,
reason: "FIELD object has no payload".into(),
});
}
if data_next_hash_offset != 0 && !valid64(data_next_hash_offset) {
return Err(Error::CorruptObject {
offset,
reason: "FIELD next_hash_offset not aligned".into(),
});
}
if data_next_field_offset != 0 && !valid64(data_next_field_offset) {
return Err(Error::CorruptObject {
offset,
reason: "FIELD head_data_offset not aligned".into(),
});
}
}
ObjectType::Entry => {
if obj_size < ENTRY_OBJECT_HEADER_SIZE as u64 {
return Err(Error::CorruptObject {
offset,
reason: "ENTRY object too small".into(),
});
}
let items_bytes = obj_size - ENTRY_OBJECT_HEADER_SIZE as u64;
let item_size = entry_item_size(compact);
if items_bytes % item_size != 0 {
return Err(Error::CorruptObject {
offset,
reason: format!(
"ENTRY items region {} not divisible by item size {}",
items_bytes, item_size
),
});
}
let n_items = items_bytes / item_size;
if n_items == 0 {
return Err(Error::CorruptObject {
offset,
reason: "ENTRY has no items".into(),
});
}
if entry_seqnum == 0 {
return Err(Error::CorruptObject {
offset,
reason: "ENTRY seqnum is zero".into(),
});
}
const TIMESTAMP_UPPER: u64 = 1u64 << 55;
if entry_realtime == 0 || entry_realtime >= TIMESTAMP_UPPER {
return Err(Error::CorruptObject {
offset,
reason: format!("ENTRY realtime {} invalid", entry_realtime),
});
}
if entry_monotonic >= TIMESTAMP_UPPER {
return Err(Error::CorruptObject {
offset,
reason: format!("ENTRY monotonic {} invalid", entry_monotonic),
});
}
if entry_boot_id == &[0u8; 16] {
return Err(Error::CorruptObject {
offset,
reason: "ENTRY boot_id is null".into(),
});
}
}
ObjectType::DataHashTable | ObjectType::FieldHashTable => {
let items_bytes = obj_size.saturating_sub(OBJECT_HEADER_SIZE as u64);
if items_bytes % HASH_ITEM_SIZE as u64 != 0 {
return Err(Error::CorruptObject {
offset,
reason: format!(
"HASH_TABLE items region {} not divisible by HashItem size {}",
items_bytes, HASH_ITEM_SIZE
),
});
}
let n_items = items_bytes / HASH_ITEM_SIZE as u64;
if n_items == 0 {
return Err(Error::CorruptObject {
offset,
reason: "HASH_TABLE has no items".into(),
});
}
}
ObjectType::EntryArray => {
let ea_item_size = entry_array_item_size(compact);
let items_bytes = obj_size.saturating_sub(ENTRY_ARRAY_OBJECT_HEADER_SIZE as u64);
if items_bytes % ea_item_size != 0 {
return Err(Error::CorruptObject {
offset,
reason: format!(
"ENTRY_ARRAY items region {} not divisible by {}",
items_bytes, ea_item_size
),
});
}
let n_items = items_bytes / ea_item_size;
if n_items == 0 {
return Err(Error::CorruptObject {
offset,
reason: "ENTRY_ARRAY has no items".into(),
});
}
if entry_array_next != 0 {
if !valid64(entry_array_next) {
return Err(Error::CorruptObject {
offset,
reason: "ENTRY_ARRAY next offset not aligned".into(),
});
}
if entry_array_next <= offset {
return Err(Error::CorruptObject {
offset,
reason: format!(
"ENTRY_ARRAY next {} <= current offset {}",
entry_array_next, offset
),
});
}
}
}
ObjectType::Tag => {
const TAG_OBJECT_SIZE: u64 = OBJECT_HEADER_SIZE as u64 + 8 + 8 + 32;
if obj_size != TAG_OBJECT_SIZE {
return Err(Error::CorruptObject {
offset,
reason: format!("TAG object size {} != expected {}", obj_size, TAG_OBJECT_SIZE),
});
}
const EPOCH_UPPER: u64 = 1u64 << 55;
if tag_epoch >= EPOCH_UPPER {
return Err(Error::CorruptObject {
offset,
reason: format!("TAG epoch {} invalid (>= 2^55)", tag_epoch),
});
}
}
ObjectType::Unused => {
}
}
Ok(())
}
pub fn verify_header(
header: &Header,
file_size: u64,
writable: bool,
current_machine_id: Option<[u8; 16]>,
) -> Result<()> {
if header.signature != HEADER_SIGNATURE {
return Err(Error::InvalidFile("bad signature".into()));
}
let incompat = from_le32(&header.incompatible_flags);
let unsupported = incompat & !incompat::SUPPORTED;
if unsupported != 0 {
return Err(Error::IncompatibleFlags { flags: unsupported });
}
if writable {
let compat = from_le32(&header.compatible_flags);
let supported_compat = compat::TAIL_ENTRY_BOOT_ID;
let unsupported_compat = compat & !supported_compat;
if unsupported_compat != 0 {
return Err(Error::InvalidFile(format!(
"unsupported compatible flags {:#x} for writable mode",
unsupported_compat
)));
}
}
let state = header.state;
if state != FileState::Offline as u8
&& state != FileState::Online as u8
&& state != FileState::Archived as u8
{
return Err(Error::InvalidFile(format!("invalid state {}", state)));
}
let header_size = from_le64(&header.header_size);
if writable {
if header_size < HEADER_SIZE {
return Err(Error::InvalidFile(format!(
"header_size {} < minimum {} for writable mode",
header_size, HEADER_SIZE
)));
}
} else {
if header_size < HEADER_SIZE_MIN {
return Err(Error::InvalidFile(format!(
"header_size {} < minimum {}",
header_size, HEADER_SIZE_MIN
)));
}
}
if writable && header_size != HEADER_SIZE {
return Err(Error::InvalidFile(format!(
"writable mode requires header_size == {}, got {}",
HEADER_SIZE, header_size
)));
}
if writable {
let compat = from_le32(&header.compatible_flags);
if compat & compat::TAIL_ENTRY_BOOT_ID == 0 {
return Err(Error::InvalidFile(
"writable mode requires TAIL_ENTRY_BOOT_ID compatible flag".into(),
));
}
}
{
let compat = from_le32(&header.compatible_flags);
if (compat & compat::SEALED) != 0 {
const N_ENTRY_ARRAYS_END: u64 = 240;
if header_size < N_ENTRY_ARRAYS_END {
return Err(Error::InvalidFile(
"SEALED flag set but header too small for n_entry_arrays field".into(),
));
}
}
}
let arena_size = from_le64(&header.arena_size);
if header_size.checked_add(arena_size).is_none() {
return Err(Error::InvalidFile("arena_size overflow".into()));
}
let total = header_size + arena_size;
if total > file_size {
return Err(Error::InvalidFile(format!(
"header_size ({}) + arena_size ({}) = {} > file_size ({})",
header_size, arena_size, total, file_size
)));
}
let tail_object_offset = from_le64(&header.tail_object_offset);
if tail_object_offset != 0 {
if !valid64(tail_object_offset) {
return Err(Error::InvalidFile(
"tail_object_offset not aligned".into(),
));
}
if tail_object_offset < header_size {
return Err(Error::InvalidFile(
"tail_object_offset before header end".into(),
));
}
if tail_object_offset >= total {
return Err(Error::InvalidFile(
"tail_object_offset beyond file end".into(),
));
}
if total - tail_object_offset < OBJECT_HEADER_SIZE as u64 {
return Err(Error::InvalidFile(
"not enough space at tail_object_offset for ObjectHeader".into(),
));
}
}
let data_ht_offset = from_le64(&header.data_hash_table_offset);
let data_ht_size = from_le64(&header.data_hash_table_size);
if data_ht_offset != 0 || data_ht_size != 0 {
if data_ht_offset == 0 || data_ht_size == 0 {
return Err(Error::InvalidFile(
"data hash table offset/size partially zero".into(),
));
}
if !valid64(data_ht_offset) {
return Err(Error::InvalidFile(
"data_hash_table_offset not aligned".into(),
));
}
if data_ht_size % HASH_ITEM_SIZE as u64 != 0 {
return Err(Error::InvalidFile(
"data_hash_table_size not multiple of HashItem".into(),
));
}
if data_ht_offset < header_size + OBJECT_HEADER_SIZE as u64 {
return Err(Error::InvalidFile(
"data_hash_table_offset too small".into(),
));
}
if data_ht_offset
.checked_add(data_ht_size)
.map_or(true, |end| end > total)
{
return Err(Error::InvalidFile(
"data hash table extends beyond file".into(),
));
}
if tail_object_offset != 0 {
let obj_start = data_ht_offset - OBJECT_HEADER_SIZE as u64;
if obj_start > tail_object_offset {
return Err(Error::InvalidFile(
"data_hash_table_offset beyond tail_object_offset".into(),
));
}
}
}
let field_ht_offset = from_le64(&header.field_hash_table_offset);
let field_ht_size = from_le64(&header.field_hash_table_size);
if field_ht_offset != 0 || field_ht_size != 0 {
if field_ht_offset == 0 || field_ht_size == 0 {
return Err(Error::InvalidFile(
"field hash table offset/size partially zero".into(),
));
}
if !valid64(field_ht_offset) {
return Err(Error::InvalidFile(
"field_hash_table_offset not aligned".into(),
));
}
if field_ht_size % HASH_ITEM_SIZE as u64 != 0 {
return Err(Error::InvalidFile(
"field_hash_table_size not multiple of HashItem".into(),
));
}
if field_ht_offset < header_size + OBJECT_HEADER_SIZE as u64 {
return Err(Error::InvalidFile(
"field_hash_table_offset too small".into(),
));
}
if field_ht_offset
.checked_add(field_ht_size)
.map_or(true, |end| end > total)
{
return Err(Error::InvalidFile(
"field hash table extends beyond file".into(),
));
}
if tail_object_offset != 0 {
let obj_start = field_ht_offset - OBJECT_HEADER_SIZE as u64;
if obj_start > tail_object_offset {
return Err(Error::InvalidFile(
"field_hash_table_offset beyond tail_object_offset".into(),
));
}
}
}
let entry_array_offset = from_le64(&header.entry_array_offset);
if !offset_is_valid(entry_array_offset, header_size, tail_object_offset) {
return Err(Error::InvalidFile(
"entry_array_offset invalid".into(),
));
}
let n_entries = from_le64(&header.n_entries);
let tail_entry_array_offset = from_le32(&header.tail_entry_array_offset) as u64;
let tail_entry_array_n_entries = from_le32(&header.tail_entry_array_n_entries) as u64;
if !offset_is_valid(tail_entry_array_offset, header_size, tail_object_offset) {
return Err(Error::InvalidFile(
"tail_entry_array_offset invalid".into(),
));
}
if entry_array_offset != 0 && tail_entry_array_offset != 0
&& entry_array_offset > tail_entry_array_offset
{
return Err(Error::InvalidFile(
"entry_array_offset > tail_entry_array_offset".into(),
));
}
if entry_array_offset == 0 && tail_entry_array_offset != 0 {
return Err(Error::InvalidFile(
"tail_entry_array set but entry_array_offset is zero".into(),
));
}
if (tail_entry_array_offset == 0) != (tail_entry_array_n_entries == 0) {
return Err(Error::InvalidFile(
"tail_entry_array_offset/n_entries partially zero".into(),
));
}
if tail_entry_array_offset != 0 && tail_entry_array_n_entries > 0 {
let compact = (incompat & incompat::COMPACT) != 0;
let item_sz = entry_array_item_size(compact);
let max_obj_size = total.saturating_sub(tail_entry_array_offset);
if tail_entry_array_n_entries.saturating_mul(item_sz) > max_obj_size {
return Err(Error::InvalidFile(format!(
"tail_entry_array_n_entries ({}) * item_sz ({}) > region_size ({})",
tail_entry_array_n_entries, item_sz, max_obj_size
)));
}
}
let tail_entry_offset = from_le64(&header.tail_entry_offset);
if !offset_is_valid(tail_entry_offset, header_size, tail_object_offset) {
return Err(Error::InvalidFile(
"tail_entry_offset invalid".into(),
));
}
let head_entry_realtime = from_le64(&header.head_entry_realtime);
let tail_entry_realtime = from_le64(&header.tail_entry_realtime);
let tail_entry_monotonic = from_le64(&header.tail_entry_monotonic);
const TS_UPPER: u64 = 1u64 << 55;
if tail_entry_offset > 0 {
let compat = from_le32(&header.compatible_flags);
if (compat & compat::TAIL_ENTRY_BOOT_ID) != 0 && header.tail_entry_boot_id == [0u8; 16] {
return Err(Error::InvalidFile(
"tail_entry set but tail_entry_boot_id is null".into(),
));
}
if head_entry_realtime == 0 || head_entry_realtime >= TS_UPPER {
return Err(Error::InvalidFile(
"tail_entry set but head_entry_realtime invalid".into(),
));
}
if tail_entry_realtime == 0 || tail_entry_realtime >= TS_UPPER {
return Err(Error::InvalidFile(
"tail_entry set but tail_entry_realtime invalid".into(),
));
}
if tail_entry_monotonic >= TS_UPPER {
return Err(Error::InvalidFile(
"tail_entry set but tail_entry_monotonic invalid".into(),
));
}
} else {
if head_entry_realtime != 0 || tail_entry_realtime != 0 || tail_entry_monotonic != 0 {
return Err(Error::InvalidFile(
"no tail_entry but timestamps are non-zero".into(),
));
}
let compat = from_le32(&header.compatible_flags);
if (compat & compat::TAIL_ENTRY_BOOT_ID) != 0 && header.tail_entry_boot_id != [0u8; 16] {
return Err(Error::InvalidFile(
"no tail_entry but tail_entry_boot_id is non-null".into(),
));
}
}
let n_objects = from_le64(&header.n_objects);
if n_objects > arena_size / OBJECT_HEADER_SIZE as u64 {
return Err(Error::InvalidFile(format!(
"n_objects ({}) impossibly large for arena_size ({})",
n_objects, arena_size
)));
}
let n_data = from_le64(&header.n_data);
let n_fields = from_le64(&header.n_fields);
let n_entry_arrays = from_le64(&header.n_entry_arrays);
let n_tags = from_le64(&header.n_tags);
if n_entries > n_objects {
return Err(Error::InvalidFile(format!(
"n_entries ({}) > n_objects ({})",
n_entries, n_objects
)));
}
if n_data > n_objects {
return Err(Error::InvalidFile(format!(
"n_data ({}) > n_objects ({})",
n_data, n_objects
)));
}
if n_fields > n_objects {
return Err(Error::InvalidFile(format!(
"n_fields ({}) > n_objects ({})",
n_fields, n_objects
)));
}
if n_entry_arrays > n_objects {
return Err(Error::InvalidFile(format!(
"n_entry_arrays ({}) > n_objects ({})",
n_entry_arrays, n_objects
)));
}
if n_tags > n_objects {
return Err(Error::InvalidFile(format!(
"n_tags ({}) > n_objects ({})",
n_tags, n_objects
)));
}
if tail_entry_array_n_entries > n_entries {
return Err(Error::InvalidFile(format!(
"tail_entry_array_n_entries ({}) > n_entries ({})",
tail_entry_array_n_entries, n_entries
)));
}
if writable {
if state == FileState::Archived as u8 {
return Err(Error::InvalidFile(
"cannot write to archived journal".into(),
));
}
if state != FileState::Offline as u8 && state != FileState::Online as u8 {
return Err(Error::InvalidFile(format!(
"unexpected state {} for writable journal",
state
)));
}
if data_ht_size == 0 {
return Err(Error::InvalidFile(
"writable mode but data hash table is empty".into(),
));
}
if field_ht_size == 0 {
return Err(Error::InvalidFile(
"writable mode but field hash table is empty".into(),
));
}
if let Some(mid) = current_machine_id {
if header.machine_id != [0u8; 16] && header.machine_id != mid {
return Err(Error::InvalidFile(
"journal file machine_id does not match current machine".into(),
));
}
}
}
Ok(())
}
pub fn journal_file_hash_data(data: &[u8], keyed_hash: bool, file_id: &[u8; 16]) -> u64 {
if keyed_hash {
use siphasher::sip::SipHasher24;
use std::hash::Hasher;
let k0 = u64::from_le_bytes(file_id[0..8].try_into().unwrap());
let k1 = u64::from_le_bytes(file_id[8..16].try_into().unwrap());
let mut hasher = SipHasher24::new_with_keys(k0, k1);
hasher.write(data);
hasher.finish()
} else {
hash64(data)
}
}
pub fn journal_field_valid(field: &[u8], allow_protected: bool) -> bool {
if field.is_empty() {
return false;
}
if field.len() > 64 {
return false;
}
if field[0].is_ascii_digit() {
return false;
}
if !allow_protected && field[0] == b'_' {
return false;
}
for &b in field {
if !matches!(b, b'A'..=b'Z' | b'0'..=b'9' | b'_') {
return false;
}
}
true
}
pub fn entry_item_size(compact: bool) -> u64 {
if compact { 4 } else { ENTRY_ITEM_SIZE as u64 }
}
pub fn entry_array_item_size(compact: bool) -> u64 {
if compact { 4 } else { 8 }
}
pub fn data_payload_offset(compact: bool) -> u64 {
if compact {
DATA_OBJECT_HEADER_SIZE as u64 + 8
} else {
DATA_OBJECT_HEADER_SIZE as u64
}
}
pub fn journal_file_entry_n_items(obj_size: u64, compact: bool) -> u64 {
let items_bytes = obj_size.saturating_sub(ENTRY_OBJECT_HEADER_SIZE as u64);
items_bytes / entry_item_size(compact)
}
pub fn entry_array_n_items(obj_size: u64, compact: bool) -> u64 {
let items_bytes = obj_size.saturating_sub(ENTRY_ARRAY_OBJECT_HEADER_SIZE as u64);
items_bytes / entry_array_item_size(compact)
}
pub fn journal_file_hash_table_n_items(obj_size: u64) -> u64 {
let items_bytes = obj_size.saturating_sub(OBJECT_HEADER_SIZE as u64);
items_bytes / HASH_ITEM_SIZE as u64
}
fn entry_item_cmp(a: &(u64, u64), b: &(u64, u64)) -> std::cmp::Ordering {
a.0.cmp(&b.0)
}
fn remove_duplicate_entry_items(items: &mut Vec<(u64, u64)>) {
items.dedup_by_key(|item| item.0);
}
fn inc_seqnum(seqnum: u64) -> u64 {
if seqnum >= u64::MAX - 1 {
1
} else {
seqnum + 1
}
}
pub struct JournalWriter {
file: File,
offset: u64,
data_ht_n: u64,
data_ht_offset: u64,
field_ht_n: u64,
field_ht_offset: u64,
entry_array_offset: u64,
global_n_entries: u64,
global_tail: Option<(u64, u64)>,
seqnum: u64,
seqnum_id: [u8; 16],
boot_id: [u8; 16],
machine_id: [u8; 16],
file_id: [u8; 16],
keyed_hash: bool,
compact: bool,
strict_order: bool,
compression: Compression,
compress_threshold_bytes: u64,
n_objects: u64,
n_entries: u64,
n_data: u64,
n_fields: u64,
n_entry_arrays: u64,
head_entry_realtime: u64,
tail_entry_realtime: u64,
tailentry_monotonic: u64,
tail_entry_seqnum: u64,
head_entry_seqnum: u64,
tail_object_offset: u64,
tail_entry_offset: u64,
data_hash_chain_depth: u64,
field_hash_chain_depth: u64,
metrics: JournalMetrics,
path: Option<String>,
online: bool,
archived: bool,
prev_boot_id: [u8; 16],
data_tail_cache: DataTailCache,
offline_state: Arc<AtomicU8>,
offline_thread: Option<std::thread::JoinHandle<()>>,
post_change_callback: Option<PostChangeCallback>,
post_change_timer_period: u64,
last_post_change: u64,
}
impl JournalWriter {
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
let path_str = path.as_ref().to_string_lossy().into_owned();
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(path.as_ref())?;
let meta = file.metadata()?;
let mut writer = if meta.len() == 0 {
Self::create_new(file)?
} else {
Self::open_existing(file)?
};
writer.path = Some(path_str);
Ok(writer)
}
pub fn open_with_template<P: AsRef<Path>>(path: P, template: &JournalWriter) -> Result<Self> {
let path_str = path.as_ref().to_string_lossy().into_owned();
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(path.as_ref())?;
let mut writer = Self::create_new(file)?;
writer.seqnum_id = template.seqnum_id;
writer.tail_entry_seqnum = template.tail_entry_seqnum;
writer.seqnum = inc_seqnum(template.tail_entry_seqnum);
writer.path = Some(path_str);
writer.write_header()?;
Ok(writer)
}
pub fn is_compact(&self) -> bool {
self.compact
}
fn parse_boolean(s: &str) -> Option<bool> {
match s.to_lowercase().as_str() {
"1" | "yes" | "true" | "on" => Some(true),
"0" | "no" | "false" | "off" => Some(false),
_ => None,
}
}
fn keyed_hash_requested() -> bool {
match std::env::var("SYSTEMD_JOURNAL_KEYED_HASH") {
Ok(v) => Self::parse_boolean(&v).unwrap_or(true),
Err(_) => true,
}
}
fn compact_mode_requested() -> bool {
match std::env::var("SYSTEMD_JOURNAL_COMPACT") {
Ok(v) => Self::parse_boolean(&v).unwrap_or(true),
Err(_) => true, }
}
fn create_new(file: File) -> Result<Self> {
Self::create_new_with_max_size(file, 128 * 1024 * 1024) }
fn create_new_with_max_size(mut file: File, max_size: u64) -> Result<Self> {
let keyed_hash = Self::keyed_hash_requested();
let compact = Self::compact_mode_requested();
let data_ht_n = Self::setup_data_hash_table_size(max_size);
let field_ht_n = DEFAULT_FIELD_HASH_TABLE_SIZE as u64;
let data_ht_offset = HEADER_SIZE;
let data_ht_bytes = data_ht_n * HASH_ITEM_SIZE as u64;
let data_ht_actual_size = OBJECT_HEADER_SIZE as u64 + data_ht_bytes;
let data_ht_obj_bytes = align64(data_ht_actual_size);
let field_ht_offset = align64(data_ht_offset + data_ht_obj_bytes);
let field_ht_bytes = field_ht_n * HASH_ITEM_SIZE as u64;
let field_ht_actual_size = OBJECT_HEADER_SIZE as u64 + field_ht_bytes;
let field_ht_obj_bytes = align64(field_ht_actual_size);
let arena_end = align64(field_ht_offset + field_ht_obj_bytes);
let file_id = *Uuid::new_v4().as_bytes();
let seqnum_id = file_id;
let mid = machine_id();
let boot_id = get_boot_id();
let header = build_header(
file_id,
mid,
seqnum_id,
boot_id,
HEADER_SIZE,
arena_end - HEADER_SIZE,
data_ht_offset,
data_ht_actual_size,
field_ht_offset,
field_ht_actual_size,
FileState::Online,
keyed_hash,
compact,
);
file.seek(SeekFrom::Start(0))?;
file.write_all(header_as_bytes(&header))?;
write_hash_table_object(&mut file, ObjectType::DataHashTable, data_ht_n)?;
let cur = file.stream_position()?;
if cur < field_ht_offset {
write_zeros(&mut file, field_ht_offset - cur)?;
}
write_hash_table_object(&mut file, ObjectType::FieldHashTable, field_ht_n)?;
let cur = file.stream_position()?;
if cur < arena_end {
write_zeros(&mut file, arena_end - cur)?;
}
file.flush()?;
Ok(Self {
file,
offset: arena_end,
data_ht_n,
data_ht_offset,
field_ht_n,
field_ht_offset,
entry_array_offset: 0,
global_n_entries: 0,
global_tail: None,
seqnum: 1,
seqnum_id,
boot_id,
machine_id: mid,
file_id,
keyed_hash,
compact,
strict_order: true,
compression: compression_requested(),
compress_threshold_bytes: DEFAULT_COMPRESS_THRESHOLD,
n_objects: 2, n_entries: 0,
n_data: 0,
n_fields: 0,
n_entry_arrays: 0,
head_entry_realtime: 0,
tail_entry_realtime: 0,
tailentry_monotonic: 0,
tail_entry_seqnum: 0,
head_entry_seqnum: 0,
tail_object_offset: field_ht_offset,
tail_entry_offset: 0,
data_hash_chain_depth: 0,
field_hash_chain_depth: 0,
metrics: JournalMetrics {
max_size,
min_size: JOURNAL_FILE_SIZE_MIN,
keep_free: 1024 * 1024, ..JournalMetrics::default()
},
path: None,
online: true,
archived: false,
prev_boot_id: [0u8; 16],
data_tail_cache: HashMap::new(),
offline_state: Arc::new(AtomicU8::new(OfflineState::Joined as u8)),
offline_thread: None,
post_change_callback: None,
post_change_timer_period: 0,
last_post_change: 0,
})
}
fn open_existing(mut file: File) -> Result<Self> {
file.seek(SeekFrom::Start(0))?;
let mut hbuf = [0u8; 272];
file.read_exact(&mut hbuf)
.map_err(|_| Error::Truncated { offset: 0 })?;
let h: Header = unsafe { std::ptr::read_unaligned(hbuf.as_ptr() as *const Header) };
let file_size = file.metadata()?.len();
#[cfg(target_os = "linux")]
let mid = Some(machine_id());
#[cfg(not(target_os = "linux"))]
let mid: Option<[u8; 16]> = None;
verify_header(&h, file_size, true, mid)?;
let offset = from_le64(&h.header_size) + from_le64(&h.arena_size);
let data_ht_offset =
from_le64(&h.data_hash_table_offset) - OBJECT_HEADER_SIZE as u64;
let data_ht_size = from_le64(&h.data_hash_table_size);
let field_ht_offset =
from_le64(&h.field_hash_table_offset) - OBJECT_HEADER_SIZE as u64;
let field_ht_size = from_le64(&h.field_hash_table_size);
let data_ht_n = data_ht_size / HASH_ITEM_SIZE as u64;
let field_ht_n = field_ht_size / HASH_ITEM_SIZE as u64;
file.seek(SeekFrom::Start(16))?;
file.write_all(&[FileState::Online as u8])?;
let entry_array_offset = from_le64(&h.entry_array_offset);
let n_entries = from_le64(&h.n_entries);
let is_compact = (from_le32(&h.incompatible_flags) & incompat::COMPACT) != 0;
let global_tail = if entry_array_offset != 0 {
let (tail_off, tail_n) = walk_entry_array_chain(&mut file, entry_array_offset, is_compact)?;
Some((tail_off, tail_n))
} else {
None
};
let tail_entry_realtime = from_le64(&h.tail_entry_realtime);
let tailentry_monotonic = from_le64(&h.tail_entry_monotonic);
Ok(Self {
file,
offset,
data_ht_n,
data_ht_offset,
field_ht_n,
field_ht_offset,
entry_array_offset,
global_n_entries: n_entries,
global_tail,
seqnum: inc_seqnum(from_le64(&h.tail_entry_seqnum)),
seqnum_id: h.seqnum_id,
boot_id: get_boot_id(),
machine_id: h.machine_id,
file_id: h.file_id,
keyed_hash: (from_le32(&h.incompatible_flags) & incompat::KEYED_HASH) != 0,
compact: (from_le32(&h.incompatible_flags) & incompat::COMPACT) != 0,
strict_order: true, n_objects: from_le64(&h.n_objects),
n_entries,
n_data: from_le64(&h.n_data),
n_fields: from_le64(&h.n_fields),
n_entry_arrays: from_le64(&h.n_entry_arrays),
head_entry_realtime: from_le64(&h.head_entry_realtime),
tail_entry_realtime,
tailentry_monotonic,
tail_entry_seqnum: from_le64(&h.tail_entry_seqnum),
head_entry_seqnum: from_le64(&h.head_entry_seqnum),
tail_object_offset: from_le64(&h.tail_object_offset),
tail_entry_offset: from_le64(&h.tail_entry_offset),
data_hash_chain_depth: from_le64(&h.data_hash_chain_depth),
field_hash_chain_depth: from_le64(&h.field_hash_chain_depth),
metrics: JournalMetrics {
max_size: 128 * 1024 * 1024, min_size: JOURNAL_FILE_SIZE_MIN,
keep_free: 1024 * 1024, ..JournalMetrics::default()
},
compression: compression_requested(),
compress_threshold_bytes: DEFAULT_COMPRESS_THRESHOLD,
path: None,
online: true,
archived: false,
prev_boot_id: h.tail_entry_boot_id,
data_tail_cache: HashMap::new(),
offline_state: Arc::new(AtomicU8::new(OfflineState::Joined as u8)),
offline_thread: None,
post_change_callback: None,
post_change_timer_period: 0,
last_post_change: 0,
})
}
pub fn append_entry<N: AsRef<[u8]>, V: AsRef<[u8]>>(
&mut self,
fields: &[(N, V)],
) -> Result<u64> {
self.append_entry_seqnum(fields, None)
}
pub fn append_entry_seqnum<N: AsRef<[u8]>, V: AsRef<[u8]>>(
&mut self,
fields: &[(N, V)],
seqnum: Option<&mut u64>,
) -> Result<u64> {
if fields.is_empty() {
return Err(Error::EmptyEntry);
}
self.journal_file_set_online()?;
let realtime = realtime_now();
let monotonic = monotonic_now();
let boot_id = self.boot_id;
const TS_UPPER: u64 = 1u64 << 55;
if realtime == 0 || realtime >= TS_UPPER {
return Err(Error::InvalidFile(format!(
"invalid realtime timestamp {}",
realtime
)));
}
if monotonic >= TS_UPPER {
return Err(Error::InvalidFile(format!(
"invalid monotonic timestamp {}",
monotonic
)));
}
let mut items: Vec<(u64, u64)> = Vec::with_capacity(fields.len());
let mut xor_hash: u64 = 0;
for (name, value) in fields {
let name = name.as_ref();
let value = value.as_ref();
validate_field_name(name)?;
let mut payload = Vec::with_capacity(name.len() + 1 + value.len());
payload.extend_from_slice(name);
payload.push(b'=');
payload.extend_from_slice(value);
let h = journal_file_hash_data(&payload, self.keyed_hash, &self.file_id);
if self.keyed_hash {
xor_hash ^= hash64(&payload);
} else {
xor_hash ^= h;
}
let (data_offset, is_new_data) = self.journal_file_append_data(&payload, h)?;
let field_offset = self.journal_file_append_field(name)?;
if is_new_data {
let field_head_ptr = field_offset + 32; let old_head = self.read_u64_at(field_head_ptr)?;
self.write_u64_at(data_offset + 32, old_head)?; self.write_u64_at(field_head_ptr, data_offset)?;
}
items.push((data_offset, h));
}
items.sort_by(entry_item_cmp);
remove_duplicate_entry_items(&mut items);
let seqnum_val = self.journal_file_entry_seqnum(seqnum);
let entry_offset =
self.journal_file_append_entry_internal(seqnum_val, realtime, monotonic, &boot_id, xor_hash, &items)?;
self.journal_file_link_entry(entry_offset, &items)?;
self.journal_file_post_change()?;
Ok(entry_offset)
}
pub fn append_entry_with_ts<N: AsRef<[u8]>, V: AsRef<[u8]>>(
&mut self,
realtime: u64,
monotonic: u64,
boot_id: &[u8; 16],
fields: &[(N, V)],
) -> Result<u64> {
self.append_entry_with_ts_seqnum(realtime, monotonic, boot_id, fields, None)
}
pub fn append_entry_with_ts_seqnum<N: AsRef<[u8]>, V: AsRef<[u8]>>(
&mut self,
realtime: u64,
monotonic: u64,
boot_id: &[u8; 16],
fields: &[(N, V)],
seqnum: Option<&mut u64>,
) -> Result<u64> {
if fields.is_empty() {
return Err(Error::EmptyEntry);
}
self.journal_file_set_online()?;
const TS_UPPER: u64 = 1u64 << 55;
if realtime == 0 || realtime >= TS_UPPER {
return Err(Error::InvalidFile(format!(
"invalid realtime timestamp {}",
realtime
)));
}
if monotonic >= TS_UPPER {
return Err(Error::InvalidFile(format!(
"invalid monotonic timestamp {}",
monotonic
)));
}
if boot_id == &[0u8; 16] {
return Err(Error::InvalidFile("empty boot ID".into()));
}
let mut items: Vec<(u64, u64)> = Vec::with_capacity(fields.len());
let mut xor_hash: u64 = 0;
for (name, value) in fields {
let name = name.as_ref();
let value = value.as_ref();
validate_field_name(name)?;
let mut payload = Vec::with_capacity(name.len() + 1 + value.len());
payload.extend_from_slice(name);
payload.push(b'=');
payload.extend_from_slice(value);
let h = journal_file_hash_data(&payload, self.keyed_hash, &self.file_id);
if self.keyed_hash {
xor_hash ^= hash64(&payload);
} else {
xor_hash ^= h;
}
let (data_offset, is_new_data) = self.journal_file_append_data(&payload, h)?;
let field_offset = self.journal_file_append_field(name)?;
if is_new_data {
let field_head_ptr = field_offset + 32;
let old_head = self.read_u64_at(field_head_ptr)?;
self.write_u64_at(data_offset + 32, old_head)?;
self.write_u64_at(field_head_ptr, data_offset)?;
}
items.push((data_offset, h));
}
items.sort_by(entry_item_cmp);
remove_duplicate_entry_items(&mut items);
let seqnum_val = self.journal_file_entry_seqnum(seqnum);
let entry_offset =
self.journal_file_append_entry_internal(seqnum_val, realtime, monotonic, boot_id, xor_hash, &items)?;
self.journal_file_link_entry(entry_offset, &items)?;
self.journal_file_post_change()?;
Ok(entry_offset)
}
pub fn flush(&mut self) -> Result<()> {
self.write_header()?;
self.file.flush()?;
Ok(())
}
pub fn file_size(&self) -> u64 {
self.offset
}
pub fn n_entries(&self) -> u64 {
self.n_entries
}
pub fn n_objects(&self) -> u64 {
self.n_objects
}
pub fn path(&self) -> Option<&str> {
self.path.as_deref()
}
pub fn set_compress_threshold_bytes(&mut self, bytes: u64) {
self.compress_threshold_bytes = std::cmp::max(MIN_COMPRESS_THRESHOLD, bytes);
}
pub fn metrics(&self) -> &JournalMetrics {
&self.metrics
}
pub fn set_metrics(&mut self, metrics: JournalMetrics) {
self.metrics = metrics;
}
pub fn journal_file_archive(&mut self) -> Result<Option<String>> {
let old_path = match self.path.take() {
Some(p) => p,
None => return Ok(None),
};
if old_path.starts_with("/proc/self/fd") {
self.path = Some(old_path);
return Err(Error::InvalidFile(
"cannot archive journal opened via /proc/self/fd".into(),
));
}
if !old_path.ends_with(".journal") {
self.path = Some(old_path);
return Err(Error::InvalidFile(
"cannot archive: path does not end with .journal".into(),
));
}
let base = &old_path[..old_path.len() - 8]; let seqnum_id_hex: String = self.seqnum_id.iter().map(|b| format!("{:02x}", b)).collect();
let new_path = format!(
"{}@{}-{:016x}-{:016x}.journal",
base, seqnum_id_hex, self.head_entry_seqnum, self.head_entry_realtime
);
match std::fs::rename(&old_path, &new_path) {
Ok(()) => {}
Err(ref e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => {
self.path = Some(old_path);
return Err(Error::Io(e));
}
}
self.online = true; self.archived = true;
if let Some(parent) = std::path::Path::new(&new_path).parent() {
if let Ok(dir) = std::fs::File::open(parent) {
let _ = dir.sync_all();
}
}
self.path = Some(new_path);
Ok(Some(old_path))
}
fn journal_file_set_offline_thread_join(&mut self) -> Result<()> {
if let Some(handle) = self.offline_thread.take() {
handle
.join()
.map_err(|_| Error::InvalidFile("offline thread panicked".into()))?;
}
Ok(())
}
fn journal_file_set_online(&mut self) -> Result<()> {
self.journal_file_set_offline_thread_join()?;
if self.online {
return Ok(());
}
self.file.seek(SeekFrom::Start(16))?;
self.file.write_all(&[FileState::Online as u8])?;
self.file.sync_all()?;
self.online = true;
self.offline_state
.store(OfflineState::Joined as u8, Ordering::SeqCst);
Ok(())
}
#[allow(dead_code)]
fn journal_file_set_offline(&mut self) -> Result<()> {
self.journal_file_set_offline_thread_join()?;
if !self.online {
return Ok(());
}
self.online = false;
self.file.sync_all()?;
self.file.seek(SeekFrom::Start(16))?;
self.file.write_all(&[FileState::Offline as u8])?;
self.file.sync_all()?;
self.offline_state
.store(OfflineState::Done as u8, Ordering::SeqCst);
Ok(())
}
fn journal_file_entry_seqnum(&mut self, seqnum: Option<&mut u64>) -> u64 {
let mut next = self.seqnum;
if let Some(ext) = seqnum {
next = std::cmp::max(inc_seqnum(*ext), next);
*ext = next;
}
if self.head_entry_seqnum == 0 {
self.head_entry_seqnum = next;
}
self.seqnum = inc_seqnum(next);
next
}
fn journal_file_allocate(&mut self, offset: u64, size: u64) -> Result<()> {
const FILE_SIZE_INCREASE: u64 = 8 * 1024 * 1024; const JOURNAL_COMPACT_SIZE_MAX: u64 = u32::MAX as u64;
if size > u64::MAX - offset {
return Err(Error::InvalidFile(format!(
"offset + size overflow ({} + {})",
offset, size
)));
}
let new_end = offset + size;
if self.metrics.max_size != u64::MAX && new_end > self.metrics.max_size {
return Err(Error::FileTooLarge(format!(
"file would exceed max_size ({} > {})",
new_end, self.metrics.max_size
)));
}
if self.compact && new_end > JOURNAL_COMPACT_SIZE_MAX {
return Err(Error::FileTooLarge(format!(
"compact mode file would exceed 4GB ({} > {})",
new_end, JOURNAL_COMPACT_SIZE_MAX
)));
}
let old_size = self.file.metadata()?.len();
if new_end <= old_size {
return Ok(()); }
#[cfg(target_os = "linux")]
if new_end > self.metrics.min_size && self.metrics.keep_free != u64::MAX {
use std::os::unix::io::AsRawFd;
let mut svfs: libc::statvfs = unsafe { std::mem::zeroed() };
if unsafe { libc::fstatvfs(self.file.as_raw_fd(), &mut svfs) } == 0 {
let available = (svfs.f_bfree as u64).saturating_mul(svfs.f_bsize as u64).saturating_sub(self.metrics.keep_free);
if new_end - old_size > available {
return Err(Error::InvalidFile("not enough free space".into()));
}
}
}
let mut new_size = ((new_end + FILE_SIZE_INCREASE - 1) / FILE_SIZE_INCREASE) * FILE_SIZE_INCREASE;
if self.metrics.max_size > 0 && new_size > self.metrics.max_size {
new_size = self.metrics.max_size;
}
#[cfg(target_os = "linux")]
{
let fd = {
use std::os::unix::io::AsRawFd;
self.file.as_raw_fd()
};
let mut r;
loop {
r = unsafe {
libc::posix_fallocate(fd, old_size as libc::off_t, (new_size - old_size) as libc::off_t)
};
if r != libc::EINTR {
break;
}
}
if r != 0 {
return Err(Error::Io(std::io::Error::from_raw_os_error(r)));
}
}
#[cfg(not(target_os = "linux"))]
{
self.file.set_len(new_size)?;
}
const ARENA_SIZE_OFFSET: u64 = 96;
self.write_u64_at(ARENA_SIZE_OFFSET, new_size - HEADER_SIZE)?;
Ok(())
}
#[allow(dead_code)]
fn journal_file_append_object(
&mut self,
obj_type: ObjectType,
actual_size: u64,
) -> Result<u64> {
let obj_offset = self.offset;
debug_assert!(valid64(obj_offset));
let aligned_size = align64(actual_size);
let hdr = ObjectHeader {
object_type: obj_type as u8,
flags: 0,
reserved: [0; 6],
size: le64(actual_size),
};
self.file.seek(SeekFrom::Start(obj_offset))?;
let hdr_bytes = unsafe {
std::slice::from_raw_parts(
&hdr as *const ObjectHeader as *const u8,
OBJECT_HEADER_SIZE,
)
};
self.file.write_all(hdr_bytes)?;
let remaining = aligned_size - OBJECT_HEADER_SIZE as u64;
write_zeros(&mut self.file, remaining)?;
self.offset += aligned_size;
self.tail_object_offset = obj_offset;
self.n_objects += 1;
Ok(obj_offset)
}
pub fn setup_data_hash_table_size(max_size: u64) -> u64 {
(max_size * 4 / 768 / 3).max(DEFAULT_DATA_HASH_TABLE_SIZE as u64)
}
pub fn setup_field_hash_table_size() -> u64 {
DEFAULT_FIELD_HASH_TABLE_SIZE as u64
}
fn read_u64_at(&mut self, offset: u64) -> Result<u64> {
self.file.seek(SeekFrom::Start(offset))?;
let mut buf = [0u8; 8];
self.file.read_exact(&mut buf)?;
Ok(u64::from_le_bytes(buf))
}
fn write_u64_at(&mut self, offset: u64, value: u64) -> Result<()> {
self.file.seek(SeekFrom::Start(offset))?;
self.file.write_all(&le64(value))?;
Ok(())
}
fn set_incompatible_flag(&mut self, flag: u32) -> Result<()> {
self.file.seek(SeekFrom::Start(12))?;
let mut buf = [0u8; 4];
self.file.read_exact(&mut buf)?;
let current = from_le32(&buf);
if current & flag == 0 {
let new_flags = current | flag;
self.file.seek(SeekFrom::Start(12))?;
self.file.write_all(&le32(new_flags))?;
}
Ok(())
}
fn try_compress_payload(&self, payload: &[u8]) -> (Vec<u8>, u8, u32) {
match self.compression {
#[cfg(feature = "zstd-compression")]
Compression::Zstd => {
if let Ok(compressed) =
zstd::encode_all(std::io::Cursor::new(payload), 0)
{
if compressed.len() < payload.len() {
return (
compressed,
obj_flags::COMPRESSED_ZSTD,
incompat::COMPRESSED_ZSTD,
);
}
}
}
#[cfg(feature = "xz-compression")]
Compression::Xz => {
use std::io::Write as _;
let mut encoder =
xz2::write::XzEncoder::new(Vec::new(), 6);
if encoder.write_all(payload).is_ok() {
if let Ok(compressed) = encoder.finish() {
if compressed.len() < payload.len() {
return (
compressed,
obj_flags::COMPRESSED_XZ,
incompat::COMPRESSED_XZ,
);
}
}
}
}
#[cfg(feature = "lz4-compression")]
Compression::Lz4 => {
let compressed_block = lz4_flex::compress(payload);
let total_len = 8 + compressed_block.len();
if total_len < payload.len() {
let mut buf = Vec::with_capacity(total_len);
buf.extend_from_slice(&(payload.len() as u64).to_le_bytes());
buf.extend_from_slice(&compressed_block);
return (
buf,
obj_flags::COMPRESSED_LZ4,
incompat::COMPRESSED_LZ4,
);
}
}
Compression::None => {}
}
(payload.to_vec(), 0, 0)
}
fn read_bytes_at(&mut self, offset: u64, n: usize) -> Result<Vec<u8>> {
self.file.seek(SeekFrom::Start(offset))?;
let mut buf = vec![0u8; n];
self.file.read_exact(&mut buf)?;
Ok(buf)
}
fn journal_file_link_field(
&mut self,
obj_offset: u64,
bucket: u64,
) -> Result<()> {
self.write_u64_at(obj_offset + 24, 0)?; self.write_u64_at(obj_offset + 32, 0)?;
let ht_items_start = self.field_ht_offset + OBJECT_HEADER_SIZE as u64;
let item_offset = ht_items_start + bucket * HASH_ITEM_SIZE as u64;
let mut item = read_hash_item(&mut self.file, item_offset)?;
let head = from_le64(&item.head_hash_offset);
let mut depth: u64 = 0;
if head == 0 {
item.head_hash_offset = le64(obj_offset);
} else {
let tail = from_le64(&item.tail_hash_offset);
self.write_u64_at(tail + 24, obj_offset)?;
depth = self.count_field_chain_depth(bucket);
}
item.tail_hash_offset = le64(obj_offset);
write_hash_item(&mut self.file, item_offset, &item)?;
if depth > self.field_hash_chain_depth {
self.field_hash_chain_depth = depth;
}
self.n_fields += 1;
Ok(())
}
fn journal_file_link_data(
&mut self,
obj_offset: u64,
bucket: u64,
) -> Result<()> {
self.write_u64_at(obj_offset + 24, 0)?; self.write_u64_at(obj_offset + 32, 0)?; self.write_u64_at(obj_offset + 40, 0)?; self.write_u64_at(obj_offset + 48, 0)?; self.write_u64_at(obj_offset + 56, 0)?;
let ht_items_start = self.data_ht_offset + OBJECT_HEADER_SIZE as u64;
let item_offset = ht_items_start + bucket * HASH_ITEM_SIZE as u64;
let mut item = read_hash_item(&mut self.file, item_offset)?;
let head = from_le64(&item.head_hash_offset);
let mut depth: u64 = 0;
if head == 0 {
item.head_hash_offset = le64(obj_offset);
} else {
let tail = from_le64(&item.tail_hash_offset);
self.write_u64_at(tail + 24, obj_offset)?;
depth = self.count_data_chain_depth(bucket);
}
item.tail_hash_offset = le64(obj_offset);
write_hash_item(&mut self.file, item_offset, &item)?;
if depth > self.data_hash_chain_depth {
self.data_hash_chain_depth = depth;
}
self.n_data += 1;
Ok(())
}
fn count_data_chain_depth(&mut self, bucket: u64) -> u64 {
let ht_items_start = self.data_ht_offset + OBJECT_HEADER_SIZE as u64;
let item_offset = ht_items_start + bucket * HASH_ITEM_SIZE as u64;
let mut p = match self.read_u64_at(item_offset) {
Ok(v) => v,
Err(_) => return 0,
};
let mut depth: u64 = 0;
while p > 0 {
depth += 1;
if depth > 1_000_000 {
break;
}
p = match self.read_u64_at(p + 24) {
Ok(v) => v,
Err(_) => break,
};
}
depth
}
fn count_field_chain_depth(&mut self, bucket: u64) -> u64 {
let ht_items_start = self.field_ht_offset + OBJECT_HEADER_SIZE as u64;
let item_offset = ht_items_start + bucket * HASH_ITEM_SIZE as u64;
let mut p = match self.read_u64_at(item_offset) {
Ok(v) => v,
Err(_) => return 0,
};
let mut depth: u64 = 0;
while p > 0 {
depth += 1;
if depth > 1_000_000 {
break;
}
p = match self.read_u64_at(p + 24) {
Ok(v) => v,
Err(_) => break,
};
}
depth
}
fn journal_file_find_field_object_with_hash(
&mut self,
field: &[u8],
hash: u64,
) -> Result<Option<u64>> {
if self.field_ht_n == 0 {
return Ok(None);
}
let bucket = hash % self.field_ht_n;
let ht_items_start = self.field_ht_offset + OBJECT_HEADER_SIZE as u64;
let item_offset = ht_items_start + bucket * HASH_ITEM_SIZE as u64;
let mut p = self.read_u64_at(item_offset)?;
let mut depth: u32 = 0;
while p > 0 {
depth += 1;
if depth > 1_000_000 {
return Err(Error::InvalidFile(
"field hash chain loop detected (depth exceeded 1,000,000)".into(),
));
}
let stored_hash = self.read_u64_at(p + 16)?;
if stored_hash != hash {
p = self.read_u64_at(p + 24)?;
continue;
}
let obj_size = self.read_u64_at(p + 8)?;
let expected_size = FIELD_OBJECT_HEADER_SIZE as u64 + field.len() as u64;
if obj_size != expected_size {
p = self.read_u64_at(p + 24)?;
continue;
}
let disk_name =
self.read_bytes_at(p + FIELD_OBJECT_HEADER_SIZE as u64, field.len())?;
if disk_name == field {
return Ok(Some(p));
}
p = self.read_u64_at(p + 24)?;
}
Ok(None)
}
fn journal_file_find_data_object_with_hash(
&mut self,
data: &[u8],
hash: u64,
) -> Result<Option<u64>> {
if self.data_ht_n == 0 {
return Ok(None);
}
let bucket = hash % self.data_ht_n;
let ht_items_start = self.data_ht_offset + OBJECT_HEADER_SIZE as u64;
let item_offset = ht_items_start + bucket * HASH_ITEM_SIZE as u64;
let mut p = self.read_u64_at(item_offset)?;
let mut depth: u32 = 0;
while p > 0 {
depth += 1;
if depth > 1_000_000 {
return Err(Error::InvalidFile(
"data hash chain loop detected (depth exceeded 1,000,000)".into(),
));
}
let stored_hash = self.read_u64_at(p + 16)?;
if stored_hash != hash {
p = self.read_u64_at(p + 24)?;
continue;
}
let flags_byte = {
self.file.seek(SeekFrom::Start(p + 1))?;
let mut buf = [0u8; 1];
self.file.read_exact(&mut buf)?;
buf[0]
};
let obj_size = self.read_u64_at(p + 8)?;
let poffset = data_payload_offset(self.compact);
if obj_size < poffset {
return Err(Error::CorruptObject {
offset: p,
reason: format!(
"data object size {} smaller than payload offset {}",
obj_size, poffset
),
});
}
let payload_len = obj_size - poffset;
let compressed_flags = flags_byte & obj_flags::COMPRESSED_MASK;
if compressed_flags != 0 {
let raw = self.read_bytes_at(p + poffset, payload_len as usize)?;
let decompressed = if (flags_byte & obj_flags::COMPRESSED_ZSTD) != 0 {
#[cfg(feature = "zstd-compression")]
{ zstd::decode_all(raw.as_slice()).ok() }
#[cfg(not(feature = "zstd-compression"))]
{ None::<Vec<u8>> }
} else if (flags_byte & obj_flags::COMPRESSED_XZ) != 0 {
#[cfg(feature = "xz-compression")]
{
use std::io::Read as _;
let mut decoder = xz2::read::XzDecoder::new(raw.as_slice());
let mut buf = Vec::new();
decoder.read_to_end(&mut buf).ok().map(|_| buf)
}
#[cfg(not(feature = "xz-compression"))]
{ None::<Vec<u8>> }
} else if (flags_byte & obj_flags::COMPRESSED_LZ4) != 0 {
#[cfg(feature = "lz4-compression")]
{
if raw.len() >= 8 {
let uncompressed_size =
u64::from_le_bytes(raw[..8].try_into().unwrap()) as usize;
lz4_flex::decompress(&raw[8..], uncompressed_size).ok()
} else {
None
}
}
#[cfg(not(feature = "lz4-compression"))]
{ None::<Vec<u8>> }
} else {
None
};
let dec = decompressed.ok_or_else(|| {
Error::Decompression(format!(
"failed to decompress data object at offset {}",
p
))
})?;
if dec == data {
return Ok(Some(p));
}
p = self.read_u64_at(p + 24)?;
continue;
}
if payload_len as usize != data.len() {
p = self.read_u64_at(p + 24)?;
continue;
}
let disk_payload = self.read_bytes_at(p + poffset, data.len())?;
if disk_payload == data {
return Ok(Some(p));
}
p = self.read_u64_at(p + 24)?;
}
Ok(None)
}
fn journal_file_append_field(&mut self, field: &[u8]) -> Result<u64> {
let h = journal_file_hash_data(field, self.keyed_hash, &self.file_id);
if let Some(off) = self.journal_file_find_field_object_with_hash(field, h)? {
return Ok(off);
}
let bucket = h % self.field_ht_n;
let actual_size = FIELD_OBJECT_HEADER_SIZE as u64 + field.len() as u64;
let total_size = align64(actual_size);
let obj_offset = self.offset;
self.journal_file_allocate(obj_offset, total_size)?;
let field_hdr = FieldObjectHeader {
object: ObjectHeader {
object_type: ObjectType::Field as u8,
flags: 0,
reserved: [0; 6],
size: le64(actual_size), },
hash: le64(h),
next_hash_offset: le64(0),
head_data_offset: le64(0),
};
self.file.seek(SeekFrom::Start(obj_offset))?;
let hdr_bytes = unsafe {
std::slice::from_raw_parts(
&field_hdr as *const FieldObjectHeader as *const u8,
FIELD_OBJECT_HEADER_SIZE,
)
};
self.file.write_all(hdr_bytes)?;
self.file.write_all(field)?;
let written = FIELD_OBJECT_HEADER_SIZE as u64 + field.len() as u64;
if total_size > written {
write_zeros(&mut self.file, total_size - written)?;
}
self.offset += total_size;
self.tail_object_offset = obj_offset;
self.n_objects += 1;
self.journal_file_link_field(obj_offset, bucket)?;
Ok(obj_offset)
}
fn journal_file_append_data(
&mut self,
payload: &[u8],
h: u64,
) -> Result<(u64, bool)> {
if payload.is_empty() {
return Err(Error::InvalidFile("empty data payload".into()));
}
if let Some(off) = self.journal_file_find_data_object_with_hash(payload, h)? {
return Ok((off, false));
}
if !payload.contains(&b'=') {
return Err(Error::InvalidFile("data payload missing '=' separator".into()));
}
let bucket = h % self.data_ht_n;
let actual_size = data_payload_offset(self.compact) + payload.len() as u64;
let total_size = align64(actual_size);
let obj_offset = self.offset;
self.journal_file_allocate(obj_offset, total_size)?;
let data_hdr = DataObjectHeader {
object: ObjectHeader {
object_type: ObjectType::Data as u8,
flags: 0,
reserved: [0; 6],
size: le64(actual_size), },
hash: le64(h),
next_hash_offset: le64(0),
next_field_offset: le64(0),
entry_offset: le64(0),
entry_array_offset: le64(0),
n_entries: le64(0),
};
self.file.seek(SeekFrom::Start(obj_offset))?;
let hdr_bytes = unsafe {
std::slice::from_raw_parts(
&data_hdr as *const DataObjectHeader as *const u8,
DATA_OBJECT_HEADER_SIZE,
)
};
self.file.write_all(hdr_bytes)?;
if self.compact {
self.file.write_all(&[0u8; 8])?;
}
let (final_payload, compress_flag, incompat_flag) = if payload.len() as u64 >= self.compress_threshold_bytes {
self.try_compress_payload(payload)
} else {
(payload.to_vec(), 0u8, 0u32)
};
self.file.write_all(&final_payload)?;
let effective_actual_size = if compress_flag != 0 {
let new_actual_size =
data_payload_offset(self.compact) + final_payload.len() as u64;
self.file.seek(SeekFrom::Start(obj_offset + 8))?;
self.file.write_all(&le64(new_actual_size))?;
self.file.seek(SeekFrom::Start(obj_offset + 1))?;
self.file.write_all(&[compress_flag])?;
self.set_incompatible_flag(incompat_flag)?;
new_actual_size
} else {
actual_size
};
let effective_total_size = align64(effective_actual_size);
let written = data_payload_offset(self.compact) + final_payload.len() as u64;
if effective_total_size > written {
self.file.seek(SeekFrom::Start(obj_offset + written))?;
write_zeros(&mut self.file, effective_total_size - written)?;
}
self.offset = obj_offset + effective_total_size;
self.tail_object_offset = obj_offset;
self.n_objects += 1;
self.journal_file_link_data(obj_offset, bucket)?;
Ok((obj_offset, true))
}
pub fn journal_file_data_payload(&mut self, data_offset: u64) -> Result<Vec<u8>> {
let obj_size = self.read_u64_at(data_offset + 8)?;
let flags_byte = {
self.file.seek(SeekFrom::Start(data_offset + 1))?;
let mut buf = [0u8; 1];
self.file.read_exact(&mut buf)?;
buf[0]
};
let payload_base = data_payload_offset(self.compact);
if obj_size < payload_base {
return Err(Error::CorruptObject {
offset: data_offset,
reason: format!("DATA object size {} < minimum {}", obj_size, payload_base),
});
}
let payload_len = obj_size - payload_base;
let raw = self.read_bytes_at(data_offset + payload_base, payload_len as usize)?;
let compressed = flags_byte & obj_flags::COMPRESSED_MASK;
if compressed & obj_flags::COMPRESSED_ZSTD != 0 {
#[cfg(feature = "zstd-compression")]
{
return zstd::decode_all(raw.as_slice())
.map_err(|e| Error::Decompression(format!("ZSTD decompression failed: {}", e)));
}
#[cfg(not(feature = "zstd-compression"))]
{
return Err(Error::InvalidFile(
"journal uses ZSTD compression but feature not enabled".into(),
));
}
} else if compressed & obj_flags::COMPRESSED_XZ != 0 {
#[cfg(feature = "xz-compression")]
{
use std::io::Read as _;
let mut decoder = xz2::read::XzDecoder::new(raw.as_slice());
let mut decompressed = Vec::new();
decoder
.read_to_end(&mut decompressed)
.map_err(|e| Error::Decompression(format!("XZ decompression failed: {}", e)))?;
return Ok(decompressed);
}
#[cfg(not(feature = "xz-compression"))]
{
return Err(Error::InvalidFile(
"journal uses XZ compression but feature not enabled".into(),
));
}
} else if compressed & obj_flags::COMPRESSED_LZ4 != 0 {
#[cfg(feature = "lz4-compression")]
{
if raw.len() < 8 {
return Err(Error::Decompression("LZ4 data too short".into()));
}
let uncompressed_size =
u64::from_le_bytes(raw[..8].try_into().unwrap()) as usize;
let compressed_data = &raw[8..];
let decompressed =
lz4_flex::decompress(compressed_data, uncompressed_size)
.map_err(|e| Error::Decompression(format!("LZ4 decompression failed: {}", e)))?;
return Ok(decompressed);
}
#[cfg(not(feature = "lz4-compression"))]
{
return Err(Error::InvalidFile(
"journal uses LZ4 compression but feature not enabled".into(),
));
}
} else if compressed != 0 {
return Err(Error::InvalidFile(
"cannot read compressed data payload (unsupported codec)".into(),
));
}
Ok(raw)
}
fn write_entry_array_item(
&mut self,
arr_offset: u64,
index: u64,
entry_offset: u64,
) -> Result<()> {
let item_sz = entry_array_item_size(self.compact);
let slot_off = arr_offset + ENTRY_ARRAY_OBJECT_HEADER_SIZE as u64 + index * item_sz;
self.file.seek(SeekFrom::Start(slot_off))?;
if self.compact {
assert!(entry_offset <= u32::MAX as u64, "compact mode offset exceeds 4GB");
self.file.write_all(&(entry_offset as u32).to_le_bytes())?;
} else {
self.file.write_all(&entry_offset.to_le_bytes())?;
}
Ok(())
}
fn link_entry_into_array(
&mut self,
first_offset_ptr: &mut u64, idx: &mut u64, tail: &mut Option<(u64, u64)>, entry_offset: u64,
) -> Result<()> {
let hidx = *idx;
if let Some((tail_arr, tail_used)) = *tail {
let tail_capacity = self.read_entry_array_capacity(tail_arr)?;
if tail_used < tail_capacity {
self.write_entry_array_item(tail_arr, tail_used, entry_offset)?;
*tail = Some((tail_arr, tail_used + 1));
*idx = hidx + 1;
return Ok(());
}
let new_cap = self.compute_new_array_capacity(hidx, tail_capacity);
let new_arr = self.write_entry_array_object(new_cap, 0)?;
self.write_entry_array_item(new_arr, 0, entry_offset)?;
self.write_u64_at(tail_arr + OBJECT_HEADER_SIZE as u64, new_arr)?;
*tail = Some((new_arr, 1));
*idx = hidx + 1;
} else if *first_offset_ptr != 0 {
let actual_tail = self.find_tail_array(*first_offset_ptr)?;
let tail_capacity = self.read_entry_array_capacity(actual_tail)?;
let (_, tail_used) = walk_entry_array_chain_at(&mut self.file, actual_tail, self.compact)?;
if tail_used < tail_capacity {
self.write_entry_array_item(actual_tail, tail_used, entry_offset)?;
*tail = Some((actual_tail, tail_used + 1));
*idx = hidx + 1;
} else {
let new_cap = self.compute_new_array_capacity(hidx, tail_capacity);
let new_arr = self.write_entry_array_object(new_cap, 0)?;
self.write_entry_array_item(new_arr, 0, entry_offset)?;
self.write_u64_at(actual_tail + OBJECT_HEADER_SIZE as u64, new_arr)?;
*tail = Some((new_arr, 1));
*idx = hidx + 1;
}
} else {
let new_cap = 4u64.max(self.compute_new_array_capacity(hidx, 0));
let new_arr = self.write_entry_array_object(new_cap, 0)?;
*first_offset_ptr = new_arr;
self.write_entry_array_item(new_arr, 0, entry_offset)?;
*tail = Some((new_arr, 1));
*idx = hidx + 1;
}
Ok(())
}
fn link_entry_into_array_plus_one(
&mut self,
extra_ptr: u64, first_offset_ptr: u64, n_entries_ptr: u64, data_offset: u64, entry_offset: u64,
) -> Result<()> {
let n_entries = self.read_u64_at(n_entries_ptr)?;
if n_entries == u64::MAX {
return Err(Error::InvalidFile("n_entries overflow in data object".into()));
}
if n_entries == 0 {
self.write_u64_at(extra_ptr, entry_offset)?;
self.write_u64_at(n_entries_ptr, 1)?;
return Ok(());
}
let mut array_idx = n_entries - 1;
let mut head_arr = self.read_u64_at(first_offset_ptr)?;
let cached = self.data_tail_cache.get(&data_offset).copied();
let mut tail = cached;
self.link_entry_into_array(
&mut head_arr,
&mut array_idx,
&mut tail,
entry_offset,
)?;
if cached.is_none() || self.read_u64_at(first_offset_ptr)? != head_arr {
self.write_u64_at(first_offset_ptr, head_arr)?;
}
if let Some(t) = tail {
self.data_tail_cache.insert(data_offset, t);
if self.compact {
let tail_arr_off = t.0 as u32;
let tail_arr_n = t.1 as u32;
self.file.seek(SeekFrom::Start(data_offset + 64))?;
self.file.write_all(&tail_arr_off.to_le_bytes())?;
self.file.write_all(&tail_arr_n.to_le_bytes())?;
}
}
self.write_u64_at(n_entries_ptr, n_entries + 1)?;
Ok(())
}
fn link_entry_into_global_array(&mut self, entry_offset: u64) -> Result<()> {
let mut first = self.entry_array_offset;
let mut idx = self.global_n_entries;
let mut tail = self.global_tail;
self.link_entry_into_array(
&mut first,
&mut idx,
&mut tail,
entry_offset,
)?;
self.entry_array_offset = first;
self.global_n_entries = idx;
self.global_tail = tail;
Ok(())
}
fn link_entry_into_data_array(
&mut self,
data_offset: u64,
entry_offset: u64,
) -> Result<()> {
self.link_entry_into_array_plus_one(
data_offset + 40, data_offset + 48, data_offset + 56, data_offset, entry_offset,
)
}
fn write_entry_item(
&mut self,
entry_offset: u64,
index: u64,
item: &(u64, u64),
) -> Result<()> {
let isize = entry_item_size(self.compact);
let item_off = entry_offset + ENTRY_OBJECT_HEADER_SIZE as u64 + index * isize;
self.file.seek(SeekFrom::Start(item_off))?;
if self.compact {
assert!(item.0 <= u32::MAX as u64, "compact mode offset exceeds 4GB");
self.file.write_all(&(item.0 as u32).to_le_bytes())?;
} else {
let entry_item = EntryItem {
object_offset: le64(item.0),
hash: le64(item.1),
};
let item_bytes = unsafe {
std::slice::from_raw_parts(
&entry_item as *const EntryItem as *const u8,
ENTRY_ITEM_SIZE,
)
};
self.file.write_all(item_bytes)?;
}
Ok(())
}
fn journal_file_append_entry_internal(
&mut self,
seqnum: u64,
realtime: u64,
monotonic: u64,
boot_id: &[u8; 16],
xor_hash: u64,
items: &[(u64, u64)],
) -> Result<u64> {
let n_items = items.len();
if self.strict_order {
if realtime < self.tail_entry_realtime {
return Err(Error::InvalidFile(format!(
"realtime {} < previous realtime {}",
realtime, self.tail_entry_realtime
)));
}
if self.prev_boot_id == *boot_id
&& monotonic < self.tailentry_monotonic
{
return Err(Error::InvalidFile(format!(
"monotonic {} < previous monotonic {} (same boot)",
monotonic, self.tailentry_monotonic
)));
}
}
if self.n_entries == 0 {
} else if self.seqnum_id != [0u8; 16] {
}
if self.machine_id == [0u8; 16] {
self.machine_id = machine_id();
}
let actual_size =
ENTRY_OBJECT_HEADER_SIZE as u64 + (n_items as u64) * entry_item_size(self.compact);
let total_size = align64(actual_size);
let obj_offset = self.offset;
self.journal_file_allocate(obj_offset, total_size)?;
let entry_hdr = EntryObjectHeader {
object: ObjectHeader {
object_type: ObjectType::Entry as u8,
flags: 0,
reserved: [0; 6],
size: le64(actual_size), },
seqnum: le64(seqnum),
realtime: le64(realtime),
monotonic: le64(monotonic),
boot_id: *boot_id,
xor_hash: le64(xor_hash),
};
self.file.seek(SeekFrom::Start(obj_offset))?;
let hdr_bytes = unsafe {
std::slice::from_raw_parts(
&entry_hdr as *const EntryObjectHeader as *const u8,
ENTRY_OBJECT_HEADER_SIZE,
)
};
self.file.write_all(hdr_bytes)?;
for (i, item) in items.iter().enumerate() {
self.write_entry_item(obj_offset, i as u64, item)?;
}
let written =
ENTRY_OBJECT_HEADER_SIZE as u64 + n_items as u64 * entry_item_size(self.compact);
if total_size > written {
write_zeros(&mut self.file, total_size - written)?;
}
self.offset += total_size;
self.tail_object_offset = obj_offset;
self.tail_entry_offset = obj_offset;
self.n_objects += 1;
self.prev_boot_id = *boot_id;
Ok(obj_offset)
}
fn journal_file_link_entry(
&mut self,
entry_offset: u64,
items: &[(u64, u64)],
) -> Result<()> {
std::sync::atomic::fence(std::sync::atomic::Ordering::SeqCst);
self.link_entry_into_global_array(entry_offset)?;
let realtime = self.read_u64_at(entry_offset + 24)?; let monotonic = self.read_u64_at(entry_offset + 32)?; let seqnum = self.read_u64_at(entry_offset + 16)?;
if self.head_entry_realtime == 0 {
self.head_entry_realtime = realtime;
self.head_entry_seqnum = seqnum;
}
self.tail_entry_realtime = realtime;
self.tailentry_monotonic = monotonic;
self.tail_entry_seqnum = seqnum;
self.n_entries = self.global_n_entries;
for &(data_offset, _) in items {
match self.link_entry_into_data_array(data_offset, entry_offset) {
Ok(()) => {}
Err(Error::FileTooLarge(_)) => {
}
Err(e) => return Err(e),
}
}
self.write_header()?;
Ok(())
}
fn journal_file_post_change(&mut self) -> Result<()> {
self.file.flush()?;
if let Ok(actual_size) = self.file.metadata().map(|m| m.len()) {
let _ = self.file.set_len(actual_size);
}
if let Some(ref mut cb) = self.post_change_callback {
let now = realtime_now();
if self.post_change_timer_period == 0
|| now.saturating_sub(self.last_post_change) >= self.post_change_timer_period
{
cb();
self.last_post_change = now;
}
}
Ok(())
}
pub fn set_post_change_callback(
&mut self,
callback: impl FnMut() + Send + 'static,
timer_period_usec: u64,
) {
self.post_change_callback = Some(Box::new(callback));
self.post_change_timer_period = timer_period_usec;
}
pub fn journal_file_get_cutoff_realtime_usec(&self) -> Option<(u64, u64)> {
if self.n_entries == 0 {
return None;
}
if self.head_entry_realtime == 0 || self.tail_entry_realtime == 0 {
return None;
}
Some((self.head_entry_realtime, self.tail_entry_realtime))
}
pub fn journal_file_get_cutoff_monotonic_usec(
&mut self,
boot_id: &[u8; 16],
) -> Option<(u64, u64)> {
if self.n_entries == 0 {
return None;
}
let boot_id_hex = boot_id.iter().map(|b| format!("{:02x}", b)).collect::<String>();
let boot_data = format!("_BOOT_ID={}", boot_id_hex);
let h = journal_file_hash_data(boot_data.as_bytes(), self.keyed_hash, &self.file_id);
let data_offset = match self.journal_file_find_data_object_with_hash(boot_data.as_bytes(), h) { Ok(Some(off)) => off, _ => return None, };
let n_entries = match self.read_u64_at(data_offset + 56) { Ok(n) => n, Err(_) => return None, };
if n_entries == 0 { return None; }
let feo = match self.read_u64_at(data_offset + 40) { Ok(off) if off > 0 => off, _ => return None, };
let from = match self.read_u64_at(feo + 32) { Ok(ts) => ts, Err(_) => return None, };
let to = match self.get_last_entry_monotonic_for_data(data_offset, n_entries) { Ok(ts) => ts, Err(_) => return None, };
Some((from, to))
}
fn get_last_entry_monotonic_for_data(&mut self, data_offset: u64, n_entries: u64) -> Result<u64> {
if n_entries == 1 {
let eo = self.read_u64_at(data_offset + 40)?;
return self.read_u64_at(eo + 32);
}
let eao = self.read_u64_at(data_offset + 48)?;
if eao == 0 {
let eo = self.read_u64_at(data_offset + 40)?;
return self.read_u64_at(eo + 32);
}
let item_sz = entry_array_item_size(self.compact);
let mut remaining = n_entries - 1;
let mut a = eao;
let mut last_array = a;
let mut last_array_n = 0u64;
while a > 0 {
let obj_size = self.read_u64_at(a + 8)?;
let k = entry_array_n_items(obj_size, self.compact);
if k == 0 { break; }
last_array = a;
if remaining <= k { last_array_n = remaining; break; }
remaining -= k;
last_array_n = k;
a = self.read_u64_at(a + OBJECT_HEADER_SIZE as u64)?;
}
if last_array_n > 0 {
let slot_off = last_array + ENTRY_ARRAY_OBJECT_HEADER_SIZE as u64 + (last_array_n - 1) * item_sz;
let eo = if self.compact {
self.file.seek(SeekFrom::Start(slot_off))?;
let mut buf = [0u8; 4];
self.file.read_exact(&mut buf)?;
u32::from_le_bytes(buf) as u64
} else {
self.read_u64_at(slot_off)?
};
if eo > 0 { return self.read_u64_at(eo + 32); }
}
let eo = self.read_u64_at(data_offset + 40)?;
self.read_u64_at(eo + 32)
}
pub fn journal_file_rotate_suggested(&mut self, max_file_usec: u64) -> bool {
if let Ok(h) = self.read_header_raw() {
if from_le64(&h.header_size) < HEADER_SIZE {
return true;
}
}
if self.data_ht_n > 0 {
if self.n_data * 4 > self.data_ht_n * 3 {
return true;
}
}
if self.field_ht_n > 0 {
if self.n_fields * 4 > self.field_ht_n * 3 {
return true;
}
}
if self.data_hash_chain_depth > HASH_CHAIN_DEPTH_MAX {
return true;
}
if self.field_hash_chain_depth > HASH_CHAIN_DEPTH_MAX {
return true;
}
if self.n_data > 0 && self.n_fields == 0 {
return true;
}
if max_file_usec > 0 && self.head_entry_realtime != 0 {
let now = realtime_now();
if now > self.head_entry_realtime
&& now - self.head_entry_realtime > max_file_usec
{
return true;
}
}
false
}
pub fn journal_file_copy_entry(
&mut self,
source: &mut JournalWriter,
entry_offset: u64,
machine_id_override: Option<&[u8; 16]>,
) -> Result<u64> {
if let Some(mid) = machine_id_override {
if self.machine_id == [0u8; 16] {
self.machine_id = *mid;
}
}
let entry_size = source.read_u64_at(entry_offset + 8)?;
let realtime = source.read_u64_at(entry_offset + 24)?;
let monotonic = source.read_u64_at(entry_offset + 32)?;
let boot_id_bytes = source.read_bytes_at(entry_offset + 40, 16)?;
let mut boot_id = [0u8; 16];
boot_id.copy_from_slice(&boot_id_bytes);
const TS_UPPER: u64 = 1u64 << 55;
if realtime == 0 || realtime >= TS_UPPER {
return Err(Error::InvalidFile(format!(
"copy_entry: invalid realtime {}",
realtime
)));
}
if monotonic >= TS_UPPER {
return Err(Error::InvalidFile(format!(
"copy_entry: invalid monotonic {}",
monotonic
)));
}
if boot_id == [0u8; 16] {
return Err(Error::InvalidFile("copy_entry: empty boot ID".into()));
}
let source_compact = source.is_compact();
let n_items = journal_file_entry_n_items(entry_size, source_compact);
if n_items == 0 {
return Ok(0);
}
let mut items: Vec<(u64, u64)> = Vec::with_capacity(n_items as usize);
let mut xor_hash: u64 = 0;
let src_item_size = entry_item_size(source_compact);
for i in 0..n_items {
let item_off = entry_offset
+ ENTRY_OBJECT_HEADER_SIZE as u64
+ i * src_item_size;
let data_offset = if source_compact {
let bytes = source.read_bytes_at(item_off, 4)?;
u32::from_le_bytes(bytes.try_into().unwrap()) as u64
} else {
source.read_u64_at(item_off)?
};
if data_offset == 0 {
continue;
}
let payload = match source.journal_file_data_payload(data_offset) {
Ok(p) => p,
Err(Error::CorruptObject { offset, reason }) => {
eprintln!(
"copy_entry: skipping corrupt DATA object at {:#x}: {}",
offset, reason
);
continue;
}
Err(Error::Decompression(msg)) => {
eprintln!(
"copy_entry: skipping DATA at {:#x} with decompression error: {}",
data_offset, msg
);
continue;
}
Err(Error::Truncated { offset }) => {
eprintln!(
"copy_entry: skipping truncated DATA object at {:#x}",
offset
);
continue;
}
Err(e) => return Err(e),
};
let h = journal_file_hash_data(&payload, self.keyed_hash, &self.file_id);
if self.keyed_hash {
xor_hash ^= hash64(&payload);
} else {
xor_hash ^= h;
}
let (dest_data_offset, is_new) = self.journal_file_append_data(&payload, h)?;
if is_new {
if let Some(eq) = payload.iter().position(|&b| b == b'=') {
let field_name = &payload[..eq];
let field_offset = self.journal_file_append_field(field_name)?;
let field_head_ptr = field_offset + 32;
let old_head = self.read_u64_at(field_head_ptr)?;
self.write_u64_at(dest_data_offset + 32, old_head)?;
self.write_u64_at(field_head_ptr, dest_data_offset)?;
}
}
items.push((dest_data_offset, h));
}
if items.is_empty() {
return Ok(0);
}
items.sort_by(entry_item_cmp);
remove_duplicate_entry_items(&mut items);
let seqnum_val = self.journal_file_entry_seqnum(None);
let entry_off = self.journal_file_append_entry_internal(
seqnum_val, realtime, monotonic, &boot_id, xor_hash, &items,
)?;
self.journal_file_link_entry(entry_off, &items)?;
self.journal_file_post_change()?;
Ok(entry_off)
}
pub fn journal_file_dump(&mut self) -> Result<String> {
let mut output = String::new();
let header_size = HEADER_SIZE;
let total_size = self.offset;
let mut cur = header_size;
output.push_str(&format!(
"Journal file dump: {} bytes, {} objects\n",
total_size, self.n_objects
));
while cur < total_size {
if !valid64(cur) {
cur = align64(cur);
if cur >= total_size {
break;
}
}
let obj_type_byte = {
self.file.seek(SeekFrom::Start(cur))?;
let mut buf = [0u8; 1];
self.file.read_exact(&mut buf)?;
buf[0]
};
let obj_size = self.read_u64_at(cur + 8)?;
if obj_size < OBJECT_HEADER_SIZE as u64 {
output.push_str(&format!(
" offset={:#x}: INVALID (size {})\n",
cur, obj_size
));
break;
}
let type_name = match ObjectType::try_from(obj_type_byte) {
Ok(t) => format!("{:?}", t),
Err(_) => format!("Unknown({})", obj_type_byte),
};
output.push_str(&format!(
" offset={:#x}: type={}, size={}\n",
cur, type_name, obj_size
));
cur += align64(obj_size);
}
Ok(output)
}
pub fn journal_file_print_header(&mut self) -> Result<String> {
let h = self.read_header_raw()?;
let mut output = String::new();
output.push_str(&format!("File ID: {:02x?}\n", h.file_id));
output.push_str(&format!("Machine ID: {:02x?}\n", h.machine_id));
output.push_str(&format!("Boot ID: {:02x?}\n", h.tail_entry_boot_id));
output.push_str(&format!("Seqnum ID: {:02x?}\n", h.seqnum_id));
let state = match h.state {
0 => "OFFLINE",
1 => "ONLINE",
2 => "ARCHIVED",
_ => "UNKNOWN",
};
output.push_str(&format!("State: {}\n", state));
output.push_str(&format!(
"Compatible Flags: {:#010x}\n",
from_le32(&h.compatible_flags)
));
output.push_str(&format!(
"Incompatible Flags: {:#010x}\n",
from_le32(&h.incompatible_flags)
));
output.push_str(&format!(
"Header size: {}\n",
from_le64(&h.header_size)
));
output.push_str(&format!(
"Arena size: {}\n",
from_le64(&h.arena_size)
));
output.push_str(&format!(
"Data Hash Table Offset: {}\n",
from_le64(&h.data_hash_table_offset)
));
output.push_str(&format!(
"Data Hash Table Size: {}\n",
from_le64(&h.data_hash_table_size)
));
output.push_str(&format!(
"Field Hash Table Offset: {}\n",
from_le64(&h.field_hash_table_offset)
));
output.push_str(&format!(
"Field Hash Table Size: {}\n",
from_le64(&h.field_hash_table_size)
));
output.push_str(&format!(
"Tail Object Offset: {}\n",
from_le64(&h.tail_object_offset)
));
output.push_str(&format!(
"Objects: {}\n",
from_le64(&h.n_objects)
));
output.push_str(&format!(
"Entries: {}\n",
from_le64(&h.n_entries)
));
output.push_str(&format!(
"Data Objects: {}\n",
from_le64(&h.n_data)
));
output.push_str(&format!(
"Field Objects: {}\n",
from_le64(&h.n_fields)
));
output.push_str(&format!(
"Entry Arrays: {}\n",
from_le64(&h.n_entry_arrays)
));
output.push_str(&format!(
"Head Entry Seqnum: {}\n",
from_le64(&h.head_entry_seqnum)
));
output.push_str(&format!(
"Tail Entry Seqnum: {}\n",
from_le64(&h.tail_entry_seqnum)
));
output.push_str(&format!(
"Entry Array Offset: {}\n",
from_le64(&h.entry_array_offset)
));
output.push_str(&format!(
"Head Entry Realtime: {}\n",
from_le64(&h.head_entry_realtime)
));
output.push_str(&format!(
"Tail Entry Realtime: {}\n",
from_le64(&h.tail_entry_realtime)
));
output.push_str(&format!(
"Tail Entry Monotonic: {}\n",
from_le64(&h.tail_entry_monotonic)
));
output.push_str(&format!(
"Data Hash Chain Depth: {}\n",
from_le64(&h.data_hash_chain_depth)
));
output.push_str(&format!(
"Field Hash Chain Depth: {}\n",
from_le64(&h.field_hash_chain_depth)
));
output.push_str(&format!(
"Tail Entry Array Offset: {}\n",
from_le32(&h.tail_entry_array_offset)
));
output.push_str(&format!(
"Tail Entry Array N Entries: {}\n",
from_le32(&h.tail_entry_array_n_entries)
));
output.push_str(&format!(
"Tail Entry Offset: {}\n",
from_le64(&h.tail_entry_offset)
));
Ok(output)
}
fn read_entry_array_capacity(&mut self, arr_offset: u64) -> Result<u64> {
let obj_size = self.read_u64_at(arr_offset + 8)?;
Ok(entry_array_n_items(obj_size, self.compact))
}
fn compute_new_array_capacity(&self, hidx: u64, prev_n: u64) -> u64 {
let n = if hidx > prev_n {
(hidx + 1) * 2
} else {
prev_n * 2
};
n.max(4)
}
fn write_entry_array_object(&mut self, capacity: u64, next: u64) -> Result<u64> {
let item_bytes = capacity * entry_array_item_size(self.compact);
let actual_size = ENTRY_ARRAY_OBJECT_HEADER_SIZE as u64 + item_bytes;
let total_size = align64(actual_size);
let obj_offset = self.offset;
self.journal_file_allocate(obj_offset, total_size)?;
let hdr = EntryArrayObjectHeader {
object: ObjectHeader {
object_type: ObjectType::EntryArray as u8,
flags: 0,
reserved: [0; 6],
size: le64(actual_size), },
next_entry_array_offset: le64(next),
};
self.file.seek(SeekFrom::Start(obj_offset))?;
let hdr_bytes = unsafe {
std::slice::from_raw_parts(
&hdr as *const EntryArrayObjectHeader as *const u8,
ENTRY_ARRAY_OBJECT_HEADER_SIZE,
)
};
self.file.write_all(hdr_bytes)?;
write_zeros(&mut self.file, total_size - ENTRY_ARRAY_OBJECT_HEADER_SIZE as u64)?;
self.offset += total_size;
self.tail_object_offset = obj_offset;
self.n_objects += 1;
self.n_entry_arrays += 1;
Ok(obj_offset)
}
#[allow(dead_code)]
fn walk_to_array_slot(
&mut self,
head: u64,
target_idx: u64,
) -> Result<(u64, u64, u64)> {
let mut cur = head;
let mut remaining = target_idx;
loop {
let cap = self.read_entry_array_capacity(cur)?;
if remaining < cap {
return Ok((cur, remaining, cap));
}
remaining -= cap;
let next = self.read_u64_at(cur + OBJECT_HEADER_SIZE as u64)?;
if next == 0 {
return Ok((cur, cap, cap));
}
cur = next;
}
}
fn find_tail_array(&mut self, head: u64) -> Result<u64> {
let mut cur = head;
loop {
let next = self.read_u64_at(cur + OBJECT_HEADER_SIZE as u64)?;
if next == 0 {
return Ok(cur);
}
cur = next;
}
}
fn write_header(&mut self) -> Result<()> {
let data_ht_actual_size =
OBJECT_HEADER_SIZE as u64 + self.data_ht_n * HASH_ITEM_SIZE as u64;
let field_ht_actual_size =
OBJECT_HEADER_SIZE as u64 + self.field_ht_n * HASH_ITEM_SIZE as u64;
let arena_size = self.offset - HEADER_SIZE;
let mut h = self.read_header_raw()?;
h.state = FileState::Online as u8;
h.arena_size = le64(arena_size);
h.tail_object_offset = le64(self.tail_object_offset);
h.n_objects = le64(self.n_objects);
h.n_entries = le64(self.n_entries);
h.n_data = le64(self.n_data);
h.n_fields = le64(self.n_fields);
h.n_tags = [0u8; 8];
h.n_entry_arrays = le64(self.n_entry_arrays);
h.tail_entry_seqnum = le64(self.tail_entry_seqnum);
h.head_entry_seqnum = le64(self.head_entry_seqnum);
h.entry_array_offset = le64(self.entry_array_offset);
h.head_entry_realtime = le64(self.head_entry_realtime);
h.tail_entry_realtime = le64(self.tail_entry_realtime);
h.tail_entry_monotonic = le64(self.tailentry_monotonic);
h.tail_entry_boot_id = self.prev_boot_id;
h.data_hash_chain_depth = le64(self.data_hash_chain_depth);
h.field_hash_chain_depth = le64(self.field_hash_chain_depth);
h.tail_entry_offset = le64(self.tail_entry_offset);
let (tail_arr_off, tail_arr_n) = if let Some((off, n)) = self.global_tail {
(off, n)
} else {
(0, 0)
};
h.tail_entry_array_offset = le32(tail_arr_off as u32);
h.tail_entry_array_n_entries = le32(tail_arr_n as u32);
h.data_hash_table_offset =
le64(self.data_ht_offset + OBJECT_HEADER_SIZE as u64);
h.data_hash_table_size =
le64(data_ht_actual_size - OBJECT_HEADER_SIZE as u64);
h.field_hash_table_offset =
le64(self.field_ht_offset + OBJECT_HEADER_SIZE as u64);
h.field_hash_table_size =
le64(field_ht_actual_size - OBJECT_HEADER_SIZE as u64);
self.file.seek(SeekFrom::Start(0))?;
self.file.write_all(header_as_bytes(&h))?;
Ok(())
}
fn read_header_raw(&mut self) -> Result<Header> {
self.file.seek(SeekFrom::Start(0))?;
let mut buf = [0u8; 272];
self.file.read_exact(&mut buf)?;
Ok(unsafe { std::ptr::read_unaligned(buf.as_ptr() as *const Header) })
}
}
impl Drop for JournalWriter {
fn drop(&mut self) {
let _ = self.journal_file_set_offline_thread_join();
let _ = self.write_header();
if let Ok(mut h) = self.read_header_raw() {
if self.archived {
h.state = FileState::Archived as u8;
} else if h.state != FileState::Archived as u8 {
h.state = FileState::Offline as u8;
}
let _ = self.file.seek(SeekFrom::Start(0));
let _ = self.file.write_all(header_as_bytes(&h));
}
let _ = self.file.sync_all();
}
}
fn validate_field_name(name: &[u8]) -> Result<()> {
if !journal_field_valid(name, true) {
return Err(Error::InvalidFieldName(
String::from_utf8_lossy(name).into_owned(),
));
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn build_header(
file_id: [u8; 16],
machine_id: [u8; 16],
seqnum_id: [u8; 16],
_boot_id: [u8; 16],
header_size: u64,
arena_size: u64,
data_ht_offset: u64,
data_ht_size: u64,
field_ht_offset: u64,
field_ht_size: u64,
state: FileState,
keyed_hash: bool,
compact: bool,
) -> Header {
let mut iflags = 0u32;
if keyed_hash {
iflags |= incompat::KEYED_HASH;
}
if compact {
iflags |= incompat::COMPACT;
}
Header {
signature: HEADER_SIGNATURE,
compatible_flags: le32(compat::TAIL_ENTRY_BOOT_ID),
incompatible_flags: le32(iflags),
state: state as u8,
reserved: [0; 7],
file_id,
machine_id,
tail_entry_boot_id: [0u8; 16],
seqnum_id,
header_size: le64(header_size),
arena_size: le64(arena_size),
data_hash_table_offset: le64(data_ht_offset + OBJECT_HEADER_SIZE as u64),
data_hash_table_size: le64(data_ht_size - OBJECT_HEADER_SIZE as u64),
field_hash_table_offset: le64(field_ht_offset + OBJECT_HEADER_SIZE as u64),
field_hash_table_size: le64(field_ht_size - OBJECT_HEADER_SIZE as u64),
tail_object_offset: le64(field_ht_offset),
n_objects: le64(2),
n_entries: le64(0),
tail_entry_seqnum: le64(0),
head_entry_seqnum: le64(0),
entry_array_offset: le64(0),
head_entry_realtime: le64(0),
tail_entry_realtime: le64(0),
tail_entry_monotonic: le64(0),
n_data: le64(0),
n_fields: le64(0),
n_tags: le64(0),
n_entry_arrays: le64(0),
data_hash_chain_depth: le64(0),
field_hash_chain_depth: le64(0),
tail_entry_array_offset: le32(0),
tail_entry_array_n_entries: le32(0),
tail_entry_offset: le64(0),
}
}
fn header_as_bytes(h: &Header) -> &[u8] {
unsafe {
std::slice::from_raw_parts(
h as *const Header as *const u8,
std::mem::size_of::<Header>(),
)
}
}
fn write_hash_table_object(file: &mut File, obj_type: ObjectType, n: u64) -> io::Result<()> {
let item_bytes = n * HASH_ITEM_SIZE as u64;
let actual_size = OBJECT_HEADER_SIZE as u64 + item_bytes;
let total = align64(actual_size);
let hdr = ObjectHeader {
object_type: obj_type as u8,
flags: 0,
reserved: [0; 6],
size: le64(actual_size), };
let hdr_bytes = unsafe {
std::slice::from_raw_parts(
&hdr as *const ObjectHeader as *const u8,
OBJECT_HEADER_SIZE,
)
};
file.write_all(hdr_bytes)?;
write_zeros(file, total - OBJECT_HEADER_SIZE as u64)?;
Ok(())
}
fn write_zeros(file: &mut File, n: u64) -> io::Result<()> {
const BUF: [u8; 512] = [0u8; 512];
let mut remaining = n;
while remaining > 0 {
let chunk = remaining.min(512) as usize;
file.write_all(&BUF[..chunk])?;
remaining -= chunk as u64;
}
Ok(())
}
fn read_hash_item(file: &mut File, offset: u64) -> Result<HashItem> {
file.seek(SeekFrom::Start(offset))?;
let mut buf = [0u8; HASH_ITEM_SIZE];
file.read_exact(&mut buf)?;
Ok(unsafe { std::ptr::read_unaligned(buf.as_ptr() as *const HashItem) })
}
fn write_hash_item(file: &mut File, offset: u64, item: &HashItem) -> Result<()> {
let bytes = unsafe {
std::slice::from_raw_parts(item as *const HashItem as *const u8, HASH_ITEM_SIZE)
};
file.seek(SeekFrom::Start(offset))?;
file.write_all(bytes)?;
Ok(())
}
fn walk_entry_array_chain(file: &mut File, head: u64, compact: bool) -> Result<(u64, u64)> {
let mut cur = head;
loop {
file.seek(SeekFrom::Start(cur + OBJECT_HEADER_SIZE as u64))?;
let mut buf = [0u8; 8];
file.read_exact(&mut buf)?;
let next = u64::from_le_bytes(buf);
if next == 0 {
let (_, used) = walk_entry_array_chain_at(file, cur, compact)?;
return Ok((cur, used));
}
cur = next;
}
}
fn walk_entry_array_chain_at(file: &mut File, arr_offset: u64, compact: bool) -> Result<(u64, u64)> {
file.seek(SeekFrom::Start(arr_offset + 8))?; let mut size_buf = [0u8; 8];
file.read_exact(&mut size_buf)?;
let obj_size = u64::from_le_bytes(size_buf);
let item_sz = entry_array_item_size(compact);
let capacity = obj_size.saturating_sub(ENTRY_ARRAY_OBJECT_HEADER_SIZE as u64) / item_sz;
let mut used = capacity;
for i in (0..capacity).rev() {
let slot_off = arr_offset + ENTRY_ARRAY_OBJECT_HEADER_SIZE as u64 + i * item_sz;
file.seek(SeekFrom::Start(slot_off))?;
let is_nonzero = if compact {
let mut buf = [0u8; 4];
file.read_exact(&mut buf)?;
u32::from_le_bytes(buf) != 0
} else {
let mut buf = [0u8; 8];
file.read_exact(&mut buf)?;
u64::from_le_bytes(buf) != 0
};
if is_nonzero {
used = i + 1;
break;
}
if i == 0 {
used = 0;
}
}
Ok((capacity, used))
}
pub fn journal_file_dispose(_dir_fd: Option<i32>, path: &str) -> Result<()> {
let p = std::path::Path::new(path);
let file_name = p
.file_name()
.and_then(|n| n.to_str())
.ok_or_else(|| Error::InvalidFile("dispose: cannot extract filename".into()))?;
let base = file_name
.strip_suffix(".journal")
.ok_or_else(|| Error::InvalidFile("dispose: path does not end in .journal".into()))?;
let random_suffix: u64 = {
use std::io::Read;
let mut buf = [0u8; 8];
if let Ok(mut f) = File::open("/dev/urandom") {
let _ = f.read_exact(&mut buf);
}
u64::from_ne_bytes(buf)
};
let rt = realtime_now();
let new_name = format!("{}@{:016x}-{:016x}.journal~", base, rt, random_suffix);
let new_path = p.with_file_name(&new_name);
std::fs::rename(path, &new_path).map_err(|e| {
Error::Io(std::io::Error::new(
e.kind(),
format!("dispose: failed to rename {:?} to {:?}: {}", path, new_path, e),
))
})?;
Ok(())
}
pub fn journal_file_parse_uid_from_filename(path: &str) -> Option<u32> {
let filename = std::path::Path::new(path)
.file_name()?
.to_str()?;
let stem = filename.strip_suffix(".journal~")
.or_else(|| filename.strip_suffix(".journal"))?;
let uid_str = stem.strip_prefix("user-")?;
uid_str.parse::<u32>().ok()
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
fn tmp_path(name: &str) -> PathBuf {
std::env::temp_dir().join(name)
}
#[test]
fn test_create_and_write() {
let path = tmp_path("qjournal_test_write.journal");
let _ = std::fs::remove_file(&path);
let mut w = JournalWriter::open(&path).unwrap();
w.append_entry(&[
("MESSAGE", b"Hello, journald!" as &[u8]),
("PRIORITY", b"6"),
("SYSLOG_IDENTIFIER", b"qjournal_test"),
])
.unwrap();
w.flush().unwrap();
drop(w);
let meta = std::fs::metadata(&path).unwrap();
assert!(meta.len() > HEADER_SIZE);
}
#[test]
fn test_reopen_and_append() {
let path = tmp_path("qjournal_test_reopen.journal");
let _ = std::fs::remove_file(&path);
{
let mut w = JournalWriter::open(&path).unwrap();
w.append_entry(&[("MESSAGE", b"first" as &[u8])])
.unwrap();
w.flush().unwrap();
}
{
let mut w = JournalWriter::open(&path).unwrap();
w.append_entry(&[("MESSAGE", b"second" as &[u8])])
.unwrap();
w.flush().unwrap();
}
let meta = std::fs::metadata(&path).unwrap();
assert!(meta.len() > HEADER_SIZE);
}
#[test]
fn test_field_name_validation() {
let path = tmp_path("qjournal_test_field.journal");
let _ = std::fs::remove_file(&path);
let mut w = JournalWriter::open(&path).unwrap();
assert!(w.append_entry(&[("message", b"bad" as &[u8])]).is_err());
assert!(w
.append_entry(&[("MY FIELD", b"bad" as &[u8])])
.is_err());
assert!(w.append_entry(&[("MESSAGE", b"ok" as &[u8])]).is_ok());
}
#[test]
fn test_header_size() {
assert_eq!(std::mem::size_of::<Header>(), 272);
}
#[test]
fn test_many_entries_exponential_growth() {
let path = tmp_path("qjournal_test_many.journal");
let _ = std::fs::remove_file(&path);
let mut w = JournalWriter::open(&path).unwrap();
for i in 0..500 {
let msg = format!("entry {}", i);
w.append_entry(&[
("MESSAGE", msg.as_bytes()),
("PRIORITY", b"6" as &[u8]),
])
.unwrap();
}
w.flush().unwrap();
assert_eq!(w.global_n_entries, 500);
drop(w);
}
#[test]
fn test_data_dedup_with_payload_check() {
let path = tmp_path("qjournal_test_dedup.journal");
let _ = std::fs::remove_file(&path);
let mut w = JournalWriter::open(&path).unwrap();
w.append_entry(&[("MESSAGE", b"same" as &[u8])]).unwrap();
let n_data_after_first = w.n_data;
w.append_entry(&[("MESSAGE", b"same" as &[u8])]).unwrap();
assert_eq!(w.n_data, n_data_after_first);
w.append_entry(&[("MESSAGE", b"different" as &[u8])])
.unwrap();
assert_eq!(w.n_data, n_data_after_first + 1);
drop(w);
}
#[test]
fn test_entry_items_sorted_and_deduped() {
let path = tmp_path("qjournal_test_sorted.journal");
let _ = std::fs::remove_file(&path);
let mut w = JournalWriter::open(&path).unwrap();
w.append_entry(&[
("ZZZZZ", b"last" as &[u8]),
("AAAAA", b"first"),
("MESSAGE", b"middle"),
])
.unwrap();
w.flush().unwrap();
drop(w);
}
#[test]
fn test_offset_is_valid() {
assert!(offset_is_valid(0, HEADER_SIZE, 0));
assert!(offset_is_valid(0, HEADER_SIZE, u64::MAX));
assert!(!offset_is_valid(3, HEADER_SIZE, u64::MAX));
assert!(!offset_is_valid(8, HEADER_SIZE, u64::MAX));
assert!(offset_is_valid(HEADER_SIZE, HEADER_SIZE, u64::MAX));
assert!(offset_is_valid(HEADER_SIZE, HEADER_SIZE, HEADER_SIZE + 64));
assert!(!offset_is_valid(HEADER_SIZE + 128, HEADER_SIZE, HEADER_SIZE + 64));
}
#[test]
fn test_minimum_header_size() {
assert_eq!(minimum_header_size(ObjectType::Data, false), DATA_OBJECT_HEADER_SIZE as u64);
assert_eq!(minimum_header_size(ObjectType::Data, true), DATA_OBJECT_HEADER_SIZE as u64 + 8);
assert_eq!(minimum_header_size(ObjectType::Field, false), FIELD_OBJECT_HEADER_SIZE as u64);
assert_eq!(minimum_header_size(ObjectType::Entry, false), ENTRY_OBJECT_HEADER_SIZE as u64);
assert_eq!(
minimum_header_size(ObjectType::EntryArray, false),
ENTRY_ARRAY_OBJECT_HEADER_SIZE as u64
);
}
#[test]
fn test_check_object_header_valid() {
assert!(check_object_header(
ObjectType::Data as u8,
DATA_OBJECT_HEADER_SIZE as u64 + 10,
0,
false,
None
)
.is_ok());
}
#[test]
fn test_check_object_header_invalid_type() {
assert!(check_object_header(0, 100, 0, false, None).is_err());
assert!(check_object_header(99, 100, 0, false, None).is_err());
}
#[test]
fn test_check_object_header_too_small() {
assert!(check_object_header(
ObjectType::Entry as u8,
OBJECT_HEADER_SIZE as u64,
0,
false,
None
)
.is_err());
}
#[test]
fn test_journal_field_valid() {
assert!(journal_field_valid(b"MESSAGE", true));
assert!(journal_field_valid(b"PRIORITY", true));
assert!(journal_field_valid(b"_SYSTEMD_UNIT", true));
assert!(!journal_field_valid(b"_SYSTEMD_UNIT", false)); assert!(!journal_field_valid(b"message", true)); assert!(!journal_field_valid(b"", true)); assert!(!journal_field_valid(b"1BAD", true)); assert!(!journal_field_valid(b"MY FIELD", true)); }
#[test]
fn test_entry_array_n_items() {
let size = ENTRY_ARRAY_OBJECT_HEADER_SIZE as u64 + 8 * 10;
assert_eq!(entry_array_n_items(size, false), 10);
let size_compact = ENTRY_ARRAY_OBJECT_HEADER_SIZE as u64 + 4 * 10;
assert_eq!(entry_array_n_items(size_compact, true), 10);
}
#[test]
fn test_journal_file_entry_n_items() {
let size = ENTRY_OBJECT_HEADER_SIZE as u64 + ENTRY_ITEM_SIZE as u64 * 5;
assert_eq!(journal_file_entry_n_items(size, false), 5);
let size_compact = ENTRY_OBJECT_HEADER_SIZE as u64 + 4 * 5;
assert_eq!(journal_file_entry_n_items(size_compact, true), 5);
}
#[test]
fn test_inc_seqnum() {
assert_eq!(inc_seqnum(1), 2);
assert_eq!(inc_seqnum(100), 101);
assert_eq!(inc_seqnum(u64::MAX - 1), 1); assert_eq!(inc_seqnum(u64::MAX), 1); }
#[test]
fn test_entry_item_cmp_and_dedup() {
let mut items = vec![(100u64, 1u64), (50, 2), (100, 3), (50, 4), (200, 5)];
items.sort_by(entry_item_cmp);
assert_eq!(items[0].0, 50);
assert_eq!(items[2].0, 100);
remove_duplicate_entry_items(&mut items);
assert_eq!(items.len(), 3);
assert_eq!(items[0].0, 50);
assert_eq!(items[1].0, 100);
assert_eq!(items[2].0, 200);
}
#[test]
fn test_verify_header_valid() {
let path = tmp_path("qjournal_test_verify.journal");
let _ = std::fs::remove_file(&path);
let mut w = JournalWriter::open(&path).unwrap();
w.append_entry(&[("MESSAGE", b"test" as &[u8])]).unwrap();
w.flush().unwrap();
let h = w.read_header_raw().unwrap();
let file_size = w.offset;
assert!(verify_header(&h, file_size, true, None).is_ok());
drop(w);
}
#[test]
fn test_verify_header_bad_signature() {
let h = Header {
signature: *b"BADMAGIC",
compatible_flags: le32(0),
incompatible_flags: le32(0),
state: FileState::Offline as u8,
reserved: [0; 7],
file_id: [0; 16],
machine_id: [0; 16],
tail_entry_boot_id: [0; 16],
seqnum_id: [0; 16],
header_size: le64(HEADER_SIZE),
arena_size: le64(0),
data_hash_table_offset: le64(0),
data_hash_table_size: le64(0),
field_hash_table_offset: le64(0),
field_hash_table_size: le64(0),
tail_object_offset: le64(0),
n_objects: le64(0),
n_entries: le64(0),
tail_entry_seqnum: le64(0),
head_entry_seqnum: le64(0),
entry_array_offset: le64(0),
head_entry_realtime: le64(0),
tail_entry_realtime: le64(0),
tail_entry_monotonic: le64(0),
n_data: le64(0),
n_fields: le64(0),
n_tags: le64(0),
n_entry_arrays: le64(0),
data_hash_chain_depth: le64(0),
field_hash_chain_depth: le64(0),
tail_entry_array_offset: le32(0),
tail_entry_array_n_entries: le32(0),
tail_entry_offset: le64(0),
};
assert!(verify_header(&h, 1024, false, None).is_err());
}
#[test]
fn test_rotate_suggested_empty_file() {
let path = tmp_path("qjournal_test_rotate.journal");
let _ = std::fs::remove_file(&path);
let mut w = JournalWriter::open(&path).unwrap();
assert!(!w.journal_file_rotate_suggested(0));
drop(w);
}
#[test]
fn test_journal_file_hash_data_deterministic() {
let fid = [0u8; 16];
let h1 = journal_file_hash_data(b"MESSAGE=hello", false, &fid);
let h2 = journal_file_hash_data(b"MESSAGE=hello", false, &fid);
assert_eq!(h1, h2);
assert_ne!(h1, journal_file_hash_data(b"MESSAGE=world", false, &fid));
let h3 = journal_file_hash_data(b"MESSAGE=hello", true, &fid);
let h4 = journal_file_hash_data(b"MESSAGE=hello", true, &fid);
assert_eq!(h3, h4);
assert_ne!(h1, h3);
}
#[test]
fn test_hash_table_sizes() {
assert_eq!(
JournalWriter::setup_data_hash_table_size(0),
DEFAULT_DATA_HASH_TABLE_SIZE as u64
);
assert_eq!(
JournalWriter::setup_field_hash_table_size(),
DEFAULT_FIELD_HASH_TABLE_SIZE as u64
);
}
#[test]
fn test_cutoff_realtime() {
let path = tmp_path("qjournal_test_cutoff_rt.journal");
let _ = std::fs::remove_file(&path);
let mut w = JournalWriter::open(&path).unwrap();
assert!(w.journal_file_get_cutoff_realtime_usec().is_none());
w.append_entry(&[("MESSAGE", b"first" as &[u8])]).unwrap();
w.flush().unwrap();
let cutoff = w.journal_file_get_cutoff_realtime_usec();
assert!(cutoff.is_some());
let (from, to) = cutoff.unwrap();
assert!(from > 0);
assert!(to >= from);
drop(w);
}
#[test]
fn test_dump_and_print_header() {
let path = tmp_path("qjournal_test_dump.journal");
let _ = std::fs::remove_file(&path);
let mut w = JournalWriter::open(&path).unwrap();
w.append_entry(&[("MESSAGE", b"test" as &[u8])]).unwrap();
w.flush().unwrap();
let dump = w.journal_file_dump().unwrap();
assert!(dump.contains("Journal file dump"));
assert!(dump.contains("Data"));
let header_info = w.journal_file_print_header().unwrap();
assert!(header_info.contains("State: ONLINE"));
assert!(header_info.contains("Entries: 1"));
drop(w);
}
#[test]
fn test_data_payload_read() {
let path = tmp_path("qjournal_test_payload.journal");
let _ = std::fs::remove_file(&path);
let mut w = JournalWriter::open(&path).unwrap();
let payload = b"MESSAGE=Hello, world!";
let h = journal_file_hash_data(payload, w.keyed_hash, &w.file_id);
let (data_off, _) = w.journal_file_append_data(payload, h).unwrap();
let read_back = w.journal_file_data_payload(data_off).unwrap();
assert_eq!(read_back, payload);
drop(w);
}
#[test]
fn test_check_object_data_valid() {
assert!(check_object(
ObjectType::Data,
DATA_OBJECT_HEADER_SIZE as u64 + 10, 0, HEADER_SIZE, false, 12345, 0, 0, 0, 0, 0, 0, 0, 0, &[0u8; 16], 0, 0, )
.is_ok());
}
#[test]
fn test_check_object_data_bad_entries_mismatch() {
assert!(check_object(
ObjectType::Data,
DATA_OBJECT_HEADER_SIZE as u64 + 10,
0,
HEADER_SIZE,
false,
12345,
0,
0,
8, 0,
0, 0,
0,
0,
&[0u8; 16],
0,
0, )
.is_err());
}
#[test]
fn test_check_object_entry_valid() {
let boot_id = [1u8; 16]; assert!(check_object(
ObjectType::Entry,
ENTRY_OBJECT_HEADER_SIZE as u64 + ENTRY_ITEM_SIZE as u64, 0,
HEADER_SIZE,
false,
0, 0, 0, 0, 0, 0, 1, 1000, 500, &boot_id, 0,
0, )
.is_ok());
}
#[test]
fn test_check_object_entry_no_items() {
let boot_id = [1u8; 16];
assert!(check_object(
ObjectType::Entry,
ENTRY_OBJECT_HEADER_SIZE as u64, 0,
HEADER_SIZE,
false,
0, 0, 0, 0, 0, 0,
1,
1000,
500,
&boot_id,
0,
0, )
.is_err());
}
#[test]
fn test_check_object_entry_null_boot_id() {
assert!(check_object(
ObjectType::Entry,
ENTRY_OBJECT_HEADER_SIZE as u64 + ENTRY_ITEM_SIZE as u64,
0,
HEADER_SIZE,
false,
0, 0, 0, 0, 0, 0,
1,
1000,
500,
&[0u8; 16], 0,
0, )
.is_err());
}
#[test]
fn test_check_object_entry_array_valid() {
assert!(check_object(
ObjectType::EntryArray,
ENTRY_ARRAY_OBJECT_HEADER_SIZE as u64 + 8 * 4, 0,
HEADER_SIZE,
false,
0, 0, 0, 0, 0, 0,
0, 0, 0,
&[0u8; 16],
0, 0, )
.is_ok());
}
#[test]
fn test_check_object_entry_array_bad_next() {
assert!(check_object(
ObjectType::EntryArray,
ENTRY_ARRAY_OBJECT_HEADER_SIZE as u64 + 8 * 4,
0,
HEADER_SIZE + 64, false,
0, 0, 0, 0, 0, 0,
0, 0, 0,
&[0u8; 16],
HEADER_SIZE, 0, )
.is_err());
}
#[test]
fn test_protected_field_validation() {
assert!(journal_field_valid(b"_SYSTEMD_UNIT", true));
assert!(!journal_field_valid(b"_SYSTEMD_UNIT", false));
}
#[test]
fn test_copy_entry() {
let src_path = tmp_path("qjournal_test_copy_src.journal");
let dst_path = tmp_path("qjournal_test_copy_dst.journal");
let _ = std::fs::remove_file(&src_path);
let _ = std::fs::remove_file(&dst_path);
let mut src = JournalWriter::open(&src_path).unwrap();
let entry_off = src
.append_entry(&[
("MESSAGE", b"copied entry" as &[u8]),
("PRIORITY", b"5"),
])
.unwrap();
src.flush().unwrap();
let mut dst = JournalWriter::open(&dst_path).unwrap();
dst.journal_file_copy_entry(&mut src, entry_off, None).unwrap();
dst.flush().unwrap();
assert_eq!(dst.n_entries(), 1);
drop(src);
drop(dst);
}
}