use compact_encoding::{
CompactEncoding, FixedWidthEncoding, FixedWidthU32, as_array_mut, get_slices_checked,
get_slices_mut_checked, map_decode, take_array_mut,
};
use futures::future::Either;
use std::convert::{TryFrom, TryInto};
use crate::common::{BitfieldUpdate, Store, StoreInfo, StoreInfoInstruction};
use crate::tree::MerkleTreeChangeset;
use crate::{HypercoreError, PartialKeypair};
use hypercore_schema::Node;
pub(crate) mod entry;
mod header;
pub(crate) use entry::{Entry, EntryTreeUpgrade};
pub(crate) use header::{Header, HeaderTree};
pub(crate) const MAX_OPLOG_ENTRIES_BYTE_SIZE: u64 = 65536;
const HEADER_SIZE: usize = 4096;
const CRC_SIZE: usize = 4;
const LEN_PARTIAL_AND_HEADER_INFO_SIZE: usize = 4;
const LEADER_SIZE: usize = CRC_SIZE + LEN_PARTIAL_AND_HEADER_INFO_SIZE;
#[derive(Debug)]
pub(crate) struct Oplog {
header_bits: [bool; 2],
pub(crate) entries_length: u64,
pub(crate) entries_byte_length: u64,
}
#[derive(Debug)]
pub(crate) struct OplogCreateHeaderOutcome {
pub(crate) header: Header,
pub(crate) infos_to_flush: Box<[StoreInfo]>,
}
#[derive(Debug)]
pub(crate) struct OplogOpenOutcome {
pub(crate) oplog: Oplog,
pub(crate) header: Header,
pub(crate) infos_to_flush: Box<[StoreInfo]>,
pub(crate) entries: Option<Box<[Entry]>>,
}
impl OplogOpenOutcome {
pub(crate) fn new(oplog: Oplog, header: Header, infos_to_flush: Box<[StoreInfo]>) -> Self {
Self {
oplog,
header,
infos_to_flush,
entries: None,
}
}
pub(crate) fn from_create_header_outcome(
oplog: Oplog,
create_header_outcome: OplogCreateHeaderOutcome,
) -> Self {
Self {
oplog,
header: create_header_outcome.header,
infos_to_flush: create_header_outcome.infos_to_flush,
entries: None,
}
}
}
#[repr(usize)]
enum OplogSlot {
FirstHeader = 0,
SecondHeader = HEADER_SIZE,
Entries = HEADER_SIZE * 2,
}
#[derive(Debug)]
struct ValidateLeaderOutcome<'a> {
state: &'a [u8],
header_bit: bool,
partial_bit: bool,
}
const INITIAL_HEADER_BITS: [bool; 2] = [true, false];
impl Oplog {
pub(crate) fn open(
key_pair: &Option<PartialKeypair>,
info: Option<StoreInfo>,
) -> Result<Either<StoreInfoInstruction, OplogOpenOutcome>, HypercoreError> {
match info {
None => Ok(Either::Left(StoreInfoInstruction::new_all_content(
Store::Oplog,
))),
Some(info) => {
let existing = info.data.expect("Could not get data of existing oplog");
let h1_outcome = if let Some(h1) =
existing.get(OplogSlot::FirstHeader as usize..OplogSlot::SecondHeader as usize)
{
Self::validate_leader(h1)?
} else {
None
};
let h2_outcome = if let Some(h2) =
existing.get(OplogSlot::SecondHeader as usize..OplogSlot::Entries as usize)
{
Self::validate_leader(h2)?
} else {
None
};
let mut outcome: OplogOpenOutcome = if let Some(h1_outcome) = h1_outcome {
let (header, header_bits): (Header, [bool; 2]) =
if let Some(h2_outcome) = h2_outcome {
let header_bits = [h1_outcome.header_bit, h2_outcome.header_bit];
let header: Header = if header_bits[0] == header_bits[1] {
Header::decode(h1_outcome.state)?.0
} else {
Header::decode(h2_outcome.state)?.0
};
(header, header_bits)
} else {
(
Header::decode(h1_outcome.state)?.0,
[h1_outcome.header_bit, h1_outcome.header_bit],
)
};
let oplog = Oplog {
header_bits,
entries_length: 0,
entries_byte_length: 0,
};
OplogOpenOutcome::new(oplog, header, Box::new([]))
} else if let Some(h2_outcome) = h2_outcome {
let header_bits: [bool; 2] = [!h2_outcome.header_bit, h2_outcome.header_bit];
let oplog = Oplog {
header_bits,
entries_length: 0,
entries_byte_length: 0,
};
OplogOpenOutcome::new(oplog, Header::decode(h2_outcome.state)?.0, Box::new([]))
} else if let Some(key_pair) = key_pair {
Self::fresh(key_pair.clone())?
} else {
return Err(HypercoreError::EmptyStorage {
store: Store::Oplog,
});
};
if existing.len() > OplogSlot::Entries as usize {
let mut entries_buff =
get_slices_checked(&existing, OplogSlot::Entries as usize)?.1;
let mut entries: Vec<Entry> = Vec::new();
let mut partials: Vec<bool> = Vec::new();
while let Some(entry_outcome) = Self::validate_leader(entries_buff)? {
let res = Entry::decode(entry_outcome.state)?;
entries.push(res.0);
entries_buff = res.1;
partials.push(entry_outcome.partial_bit);
}
while !partials.is_empty() && partials[partials.len() - 1] {
entries.pop();
}
outcome.entries = Some(entries.into_boxed_slice());
}
Ok(Either::Right(outcome))
}
}
}
pub(crate) fn append_changeset(
&mut self,
changeset: &MerkleTreeChangeset,
bitfield_update: Option<BitfieldUpdate>,
atomic: bool,
header: &Header,
) -> Result<OplogCreateHeaderOutcome, HypercoreError> {
let mut header: Header = header.clone();
let entry = self.update_header_with_changeset(changeset, bitfield_update, &mut header)?;
Ok(OplogCreateHeaderOutcome {
header,
infos_to_flush: self.append_entries(&[entry], atomic)?,
})
}
pub(crate) fn update_header_with_changeset(
&self,
changeset: &MerkleTreeChangeset,
bitfield_update: Option<BitfieldUpdate>,
header: &mut Header,
) -> Result<Entry, HypercoreError> {
let tree_nodes: Vec<Node> = changeset.nodes.clone();
let entry: Entry = if changeset.upgraded {
let hash = changeset
.hash
.as_ref()
.expect("Upgraded changeset must have a hash before appended");
let signature = changeset
.signature
.expect("Upgraded changeset must be signed before appended");
let signature: Box<[u8]> = signature.to_bytes().into();
header.tree.root_hash = hash.clone();
header.tree.signature = signature.clone();
header.tree.length = changeset.length;
Entry {
user_data: vec![],
tree_nodes,
tree_upgrade: Some(EntryTreeUpgrade {
fork: changeset.fork,
ancestors: changeset.ancestors,
length: changeset.length,
signature,
}),
bitfield: bitfield_update,
}
} else {
Entry {
user_data: vec![],
tree_nodes,
tree_upgrade: None,
bitfield: bitfield_update,
}
};
Ok(entry)
}
pub(crate) fn clear(
&mut self,
start: u64,
end: u64,
) -> Result<Box<[StoreInfo]>, HypercoreError> {
let entry: Entry = Entry {
user_data: vec![],
tree_nodes: vec![],
tree_upgrade: None,
bitfield: Some(BitfieldUpdate {
drop: true,
start,
length: end - start,
}),
};
self.append_entries(&[entry], false)
}
pub(crate) fn flush(
&mut self,
header: &Header,
clear_traces: bool,
) -> Result<Box<[StoreInfo]>, HypercoreError> {
let (new_header_bits, infos_to_flush) = if clear_traces {
let (new_header_bits, infos_to_flush) =
Self::insert_header(header, 0, self.header_bits, clear_traces)?;
let mut combined_infos_to_flush: Vec<StoreInfo> =
infos_to_flush.into_vec().drain(0..1).collect();
let (new_header_bits, infos_to_flush) =
Self::insert_header(header, 0, new_header_bits, clear_traces)?;
combined_infos_to_flush.extend(infos_to_flush.into_vec());
(new_header_bits, combined_infos_to_flush.into_boxed_slice())
} else {
Self::insert_header(header, 0, self.header_bits, clear_traces)?
};
self.entries_byte_length = 0;
self.entries_length = 0;
self.header_bits = new_header_bits;
Ok(infos_to_flush)
}
fn append_entries(
&mut self,
batch: &[Entry],
atomic: bool,
) -> Result<Box<[StoreInfo]>, HypercoreError> {
let len = batch.len();
let header_bit = self.get_current_header_bit();
let mut size = len * LEADER_SIZE;
for e in batch.iter() {
size += e.encoded_size()?;
}
let mut buffer = vec![0; size];
let mut rest = buffer.as_mut_slice();
for (i, entry) in batch.iter().enumerate() {
let partial_bit: bool = atomic && i < len - 1;
rest = encode_with_leader(entry, partial_bit, header_bit, rest)?;
}
let index = OplogSlot::Entries as u64 + self.entries_byte_length;
self.entries_length += len as u64;
self.entries_byte_length += size as u64;
Ok(vec![StoreInfo::new_content(Store::Oplog, index, &buffer)].into_boxed_slice())
}
fn fresh(key_pair: PartialKeypair) -> Result<OplogOpenOutcome, HypercoreError> {
let entries_length: u64 = 0;
let entries_byte_length: u64 = 0;
let header = Header::new(key_pair);
let (header_bits, infos_to_flush) =
Self::insert_header(&header, entries_byte_length, INITIAL_HEADER_BITS, false)?;
let oplog = Oplog {
header_bits,
entries_length,
entries_byte_length,
};
Ok(OplogOpenOutcome::from_create_header_outcome(
oplog,
OplogCreateHeaderOutcome {
header,
infos_to_flush,
},
))
}
fn insert_header(
header: &Header,
entries_byte_length: u64,
current_header_bits: [bool; 2],
clear_traces: bool,
) -> Result<([bool; 2], Box<[StoreInfo]>), HypercoreError> {
let (oplog_slot, header_bit) =
Oplog::get_next_header_oplog_slot_and_bit_value(¤t_header_bits);
let mut new_header_bits = current_header_bits;
match oplog_slot {
OplogSlot::FirstHeader => new_header_bits[0] = header_bit,
OplogSlot::SecondHeader => new_header_bits[1] = header_bit,
_ => {
panic!("Invalid oplog slot");
}
}
let mut size = LEADER_SIZE + header.encoded_size()?;
size += header.encoded_size()?;
if clear_traces {
size = HEADER_SIZE;
}
let mut buffer = vec![0; size];
encode_with_leader(header, false, header_bit, &mut buffer)?;
let truncate_index = OplogSlot::Entries as u64 + entries_byte_length;
Ok((
new_header_bits,
vec![
StoreInfo::new_content(Store::Oplog, oplog_slot as u64, &buffer),
StoreInfo::new_truncate(Store::Oplog, truncate_index),
]
.into_boxed_slice(),
))
}
fn validate_leader(buffer: &[u8]) -> Result<Option<ValidateLeaderOutcome<'_>>, HypercoreError> {
if buffer.len() < 8 {
return Ok(None);
}
let ((stored_checksum, combined), data_buff) =
map_decode!(buffer, [FixedWidthU32<'_>, FixedWidthU32<'_>]);
let len = usize::try_from(combined >> 2)
.expect("Attempted converting to a 32 bit usize on below 32 bit system");
if len == 0 || data_buff.len() < len {
return Ok(None);
}
let header_bit = combined & 1 == 1;
let partial_bit = combined & 2 == 2;
let to_hash = &buffer[CRC_SIZE..LEADER_SIZE + len];
let calculated_checksum = crc32fast::hash(to_hash);
if calculated_checksum != stored_checksum {
return Err(HypercoreError::InvalidChecksum {
context: format!(
"Calculated signature [{calculated_checksum}] does not match oplog signature [{stored_checksum}]"
),
});
};
Ok(Some(ValidateLeaderOutcome {
header_bit,
partial_bit,
state: data_buff,
}))
}
fn get_current_header_bit(&self) -> bool {
self.header_bits[0] != self.header_bits[1]
}
fn get_next_header_oplog_slot_and_bit_value(header_bits: &[bool; 2]) -> (OplogSlot, bool) {
if header_bits[0] != header_bits[1] {
(OplogSlot::FirstHeader, !header_bits[0])
} else {
(OplogSlot::SecondHeader, !header_bits[1])
}
}
}
fn build_len_and_info_header(data_length: usize, header_bit: bool, partial_bit: bool) -> u32 {
let data_length: u32 = data_length
.try_into()
.expect("Must be able to convert usize to u32");
const MASK: u32 = (3u32).rotate_right(2);
if (MASK & data_length) != 0 {
panic!("Data length would overflow. It does not fit in 30 bits");
}
let partial_bit: u32 = if partial_bit { 2 } else { 0 };
let header_bit: u32 = if header_bit { 1 } else { 0 };
(data_length << 2) | header_bit | partial_bit
}
fn write_leader_parts(
header_bit: bool,
partial_bit: bool,
crc_zone: &mut [u8; CRC_SIZE],
len_and_meta_zone: &mut [u8; LEN_PARTIAL_AND_HEADER_INFO_SIZE],
data: &[u8],
) -> Result<(), HypercoreError> {
let len_and_info = build_len_and_info_header(data.len(), header_bit, partial_bit);
(len_and_info.as_fixed_width()).encode(len_and_meta_zone)?;
let mut hasher = crc32fast::Hasher::new();
hasher.update(len_and_meta_zone);
hasher.update(data);
hasher.finalize().as_fixed_width().encode(crc_zone)?;
Ok(())
}
fn encode_with_leader<'a>(
thing: &impl CompactEncoding,
partial_bit: bool,
header_bit: bool,
buffer: &'a mut [u8],
) -> Result<&'a mut [u8], HypercoreError> {
let (leader_bytes, data_and_rest) = take_array_mut::<LEADER_SIZE>(buffer)?;
let enc_size = thing.encoded_size()?;
let (data_buff, rest) = get_slices_mut_checked(data_and_rest, enc_size)?;
let (crc_zone, len_and_meta_zone) = get_slices_mut_checked(leader_bytes, CRC_SIZE)?;
thing.encode(data_buff)?;
write_leader_parts(
header_bit,
partial_bit,
as_array_mut::<CRC_SIZE>(crc_zone)?,
as_array_mut::<LEN_PARTIAL_AND_HEADER_INFO_SIZE>(len_and_meta_zone)?,
data_buff,
)?;
Ok(rest)
}