use std::path::{Path, PathBuf};
use citadel_core::{
Error, Result, KEY_SIZE, REGION_STORE_MAGIC, REGION_STORE_PREALLOC_SLOTS, REGION_STORE_VERSION,
WRAPPED_KEY_SIZE,
};
use citadel_io::durable::{append_and_sync, overwrite_in_place, truncate_and_sync, write_and_sync};
use zeroize::Zeroizing;
use crate::key_codec::{
self, build_slot_block, empty_slot_block, header_offset, parse_slot_block, slot_offset,
SlotRecord, SlotState, BLOCK,
};
#[cfg(test)]
use crate::key_codec::{HEADER_MAC_INPUT, SLOT_MAC_INPUT};
const GROW_SLOTS: u32 = REGION_STORE_PREALLOC_SLOTS;
fn build_header_block(
mac_key: &[u8; KEY_SIZE],
file_id: u64,
slot_count: u32,
gen: u64,
) -> [u8; BLOCK] {
key_codec::build_header_block(
mac_key,
REGION_STORE_MAGIC,
REGION_STORE_VERSION,
file_id,
slot_count,
gen,
)
}
fn parse_header_block(mac_key: &[u8; KEY_SIZE], file_id: u64, b: &[u8]) -> Option<(u32, u64)> {
key_codec::parse_header_block(
mac_key,
REGION_STORE_MAGIC,
REGION_STORE_VERSION,
file_id,
b,
)
}
pub(crate) struct RegionKeyStore {
path: PathBuf,
file_id: u64,
mac_key: Zeroizing<[u8; KEY_SIZE]>,
slot_count: u32,
}
impl std::fmt::Debug for RegionKeyStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RegionKeyStore")
.field("path", &self.path)
.field("file_id", &self.file_id)
.field("slot_count", &self.slot_count)
.finish_non_exhaustive()
}
}
struct SlotView {
record: SlotRecord,
authoritative_b: bool,
max_gen: u64,
}
impl RegionKeyStore {
pub(crate) fn create_or_open(
path: &Path,
file_id: u64,
mac_key: [u8; KEY_SIZE],
) -> Result<Self> {
let mac_key = Zeroizing::new(mac_key);
if path.exists() {
let bytes = std::fs::read(path)?;
if bytes.len() < 2 * BLOCK {
return Err(Error::RegionStoreCorrupt(
"store smaller than header".into(),
));
}
let a = parse_header_block(&mac_key, file_id, &bytes[header_offset(false) as usize..]);
let b = parse_header_block(&mac_key, file_id, &bytes[header_offset(true) as usize..]);
let slot_count = match (a, b) {
(Some((sa, ga)), Some((sb, gb))) => {
if ga >= gb {
sa
} else {
sb
}
}
(Some((s, _)), None) | (None, Some((s, _))) => s,
(None, None) => {
return Err(Error::RegionStoreCorrupt(
"no valid header copy (wrong key or corrupt store)".into(),
))
}
};
let on_disk = ((bytes.len() - 2 * BLOCK) / (2 * BLOCK)) as u32;
let slot_count = slot_count.min(on_disk);
let aligned_len = (2 + 2 * slot_count as usize) * BLOCK;
if bytes.len() != aligned_len {
truncate_and_sync(path, aligned_len as u64)?;
}
Ok(Self {
path: path.to_path_buf(),
file_id,
mac_key,
slot_count,
})
} else {
let slot_count = REGION_STORE_PREALLOC_SLOTS;
let mut buf = Vec::with_capacity((2 + 2 * slot_count as usize) * BLOCK);
let hdr = build_header_block(&mac_key, file_id, slot_count, 1);
buf.extend_from_slice(&hdr);
buf.extend_from_slice(&hdr);
let empty = empty_slot_block(&mac_key);
for _ in 0..slot_count {
buf.extend_from_slice(&empty);
buf.extend_from_slice(&empty);
}
write_and_sync(path, &buf)?;
Ok(Self {
path: path.to_path_buf(),
file_id,
mac_key,
slot_count,
})
}
}
fn read_file(&self) -> Result<Vec<u8>> {
Ok(std::fs::read(&self.path)?)
}
fn view(&self, bytes: &[u8], i: u32) -> Result<SlotView> {
let off_a = slot_offset(i, false) as usize;
let off_b = slot_offset(i, true) as usize;
if bytes.len() < off_b + BLOCK {
return Err(Error::RegionStoreCorrupt(format!("slot {i} out of bounds")));
}
let a = parse_slot_block(&self.mac_key, &bytes[off_a..off_a + BLOCK]);
let b = parse_slot_block(&self.mac_key, &bytes[off_b..off_b + BLOCK]);
match (a, b) {
(Some(ra), Some(rb)) => {
if rb.gen > ra.gen {
Ok(SlotView {
record: rb,
authoritative_b: true,
max_gen: rb.gen,
})
} else {
Ok(SlotView {
record: ra,
authoritative_b: false,
max_gen: ra.gen,
})
}
}
(Some(ra), None) => Ok(SlotView {
record: ra,
authoritative_b: false,
max_gen: ra.gen,
}),
(None, Some(rb)) => Ok(SlotView {
record: rb,
authoritative_b: true,
max_gen: rb.gen,
}),
(None, None) => Err(Error::RegionStoreCorrupt(format!(
"slot {i} has no valid copy"
))),
}
}
pub(crate) fn read_slot(&self, i: u32) -> Result<SlotRecord> {
let bytes = self.read_file()?;
Ok(self.view(&bytes, i)?.record)
}
pub(crate) fn live_owners(&self) -> Result<Vec<(u32, u64)>> {
let bytes = self.read_file()?;
let mut live = Vec::new();
for i in 0..self.slot_count {
let rec = self.view(&bytes, i)?.record;
if rec.state == SlotState::Live {
live.push((i, rec.region_id));
}
}
Ok(live)
}
pub(crate) fn allocate_slot(&mut self) -> Result<u32> {
let bytes = self.read_file()?;
for i in 0..self.slot_count {
let st = self.view(&bytes, i)?.record.state;
if st == SlotState::Empty || st == SlotState::Tombstone {
return Ok(i);
}
}
self.grow()?;
Ok(self.slot_count - GROW_SLOTS)
}
fn grow(&mut self) -> Result<()> {
let empty = empty_slot_block(&self.mac_key);
let mut tail = Vec::with_capacity(GROW_SLOTS as usize * 2 * BLOCK);
for _ in 0..GROW_SLOTS {
tail.extend_from_slice(&empty);
tail.extend_from_slice(&empty);
}
append_and_sync(&self.path, &tail)?;
let new_count = self.slot_count + GROW_SLOTS;
let bytes = self.read_file()?;
let gen = self.header_gen(&bytes)?.saturating_add(1);
let hdr = build_header_block(&self.mac_key, self.file_id, new_count, gen);
overwrite_in_place(&self.path, header_offset(false), &hdr)?;
overwrite_in_place(&self.path, header_offset(true), &hdr)?;
self.slot_count = new_count;
Ok(())
}
fn header_gen(&self, bytes: &[u8]) -> Result<u64> {
let a = parse_header_block(
&self.mac_key,
self.file_id,
&bytes[header_offset(false) as usize..],
);
let b = parse_header_block(
&self.mac_key,
self.file_id,
&bytes[header_offset(true) as usize..],
);
match (a, b) {
(Some((_, ga)), Some((_, gb))) => Ok(ga.max(gb)),
(Some((_, g)), None) | (None, Some((_, g))) => Ok(g),
(None, None) => Err(Error::RegionStoreCorrupt("no valid header copy".into())),
}
}
pub(crate) fn write_live(
&self,
slot: u32,
region_id: u64,
wrapped: &[u8; WRAPPED_KEY_SIZE],
) -> Result<u64> {
let bytes = self.read_file()?;
let view = self.view(&bytes, slot)?;
let new_gen = view.max_gen + 1;
let block = build_slot_block(&self.mac_key, SlotState::Live, region_id, new_gen, wrapped);
let target_b = !view.authoritative_b;
let off = slot_offset(slot, target_b);
overwrite_in_place(&self.path, off, &block)?;
let confirm = std::fs::read(&self.path)?;
let o = off as usize;
match parse_slot_block(&self.mac_key, &confirm[o..o + BLOCK]) {
Some(r) if r.state == SlotState::Live && r.gen == new_gen => {}
_ => {
return Err(Error::RegionStoreCorrupt(format!(
"write_live of slot {slot} did not persist"
)))
}
}
Ok(new_gen)
}
pub(crate) fn tombstone(&self, slot: u32, expected_region_id: u64) -> Result<()> {
let bytes = self.read_file()?;
let view = self.view(&bytes, slot)?;
match view.record.state {
SlotState::Tombstone => return Ok(()),
SlotState::Empty => {
return Err(Error::RegionStoreCorrupt(format!(
"forget of slot {slot} which holds no live key"
)))
}
SlotState::Live => {}
}
if view.record.region_id != expected_region_id {
return Err(Error::RegionStoreCorrupt(format!(
"slot {slot} holds region {} not {expected_region_id}",
view.record.region_id
)));
}
let new_gen = view.max_gen + 1;
let tomb = build_slot_block(
&self.mac_key,
SlotState::Tombstone,
0,
new_gen,
&[0u8; WRAPPED_KEY_SIZE],
);
let live_copy_b = view.authoritative_b;
overwrite_in_place(&self.path, slot_offset(slot, live_copy_b), &tomb)?;
let confirm = std::fs::read(&self.path)?;
let off = slot_offset(slot, live_copy_b) as usize;
match parse_slot_block(&self.mac_key, &confirm[off..off + BLOCK]) {
Some(r) if r.state == SlotState::Tombstone && r.gen == new_gen => {}
_ => {
return Err(Error::RegionStoreCorrupt(format!(
"tombstone of slot {slot} did not persist"
)))
}
}
overwrite_in_place(&self.path, slot_offset(slot, !live_copy_b), &tomb)?;
Ok(())
}
#[cfg(test)]
pub(crate) fn slot_count(&self) -> u32 {
self.slot_count
}
}
#[cfg(test)]
#[path = "region_store_tests.rs"]
mod tests;