use std::sync::Arc;
use crate::core::core::hash::{Hash, Hashed};
use crate::core::core::{pmmr, pmmr::ReadablePMMR};
use crate::core::core::{
BlockHeader, BlockSums, OutputIdentifier, Segment, SegmentIdentifier, SegmentType,
SegmentTypeIdentifier, TxKernel,
};
use crate::error::Error;
use crate::txhashset::{BitmapAccumulator, BitmapChunk, TxHashSet};
use crate::types::{Tip, TxHashsetWriteStatus};
use crate::util::secp::pedersen::RangeProof;
use crate::util::{RwLock, StopState};
use crate::SyncState;
use crate::pibd_params;
use crate::store;
use crate::txhashset;
use croaring::Bitmap;
#[derive(Clone)]
pub struct Desegmenter {
txhashset: Arc<RwLock<TxHashSet>>,
header_pmmr: Arc<RwLock<txhashset::PMMRHandle<BlockHeader>>>,
archive_header: BlockHeader,
store: Arc<store::ChainStore>,
genesis: BlockHeader,
default_bitmap_segment_height: u8,
default_output_segment_height: u8,
default_rangeproof_segment_height: u8,
default_kernel_segment_height: u8,
segment_apply_batch_size: usize,
bitmap_accumulator: BitmapAccumulator,
bitmap_segment_cache: Vec<Segment<BitmapChunk>>,
output_segment_cache: Vec<Segment<OutputIdentifier>>,
rangeproof_segment_cache: Vec<Segment<RangeProof>>,
kernel_segment_cache: Vec<Segment<TxKernel>>,
bitmap_mmr_leaf_count: u64,
bitmap_mmr_size: u64,
max_cached_segments: usize,
bitmap_cache: Option<Bitmap>,
all_segments_complete: bool,
latest_block_height: u64,
}
impl Desegmenter {
pub fn new(
txhashset: Arc<RwLock<TxHashSet>>,
header_pmmr: Arc<RwLock<txhashset::PMMRHandle<BlockHeader>>>,
archive_header: BlockHeader,
genesis: BlockHeader,
store: Arc<store::ChainStore>,
) -> Desegmenter {
trace!("Creating new desegmenter");
let mut retval = Desegmenter {
txhashset,
header_pmmr,
archive_header,
store,
genesis,
bitmap_accumulator: BitmapAccumulator::new(),
default_bitmap_segment_height: pibd_params::BITMAP_SEGMENT_HEIGHT,
default_output_segment_height: pibd_params::OUTPUT_SEGMENT_HEIGHT,
default_rangeproof_segment_height: pibd_params::RANGEPROOF_SEGMENT_HEIGHT,
default_kernel_segment_height: pibd_params::KERNEL_SEGMENT_HEIGHT,
segment_apply_batch_size: pibd_params::SEGMENT_APPLY_BATCH_SIZE,
bitmap_segment_cache: vec![],
output_segment_cache: vec![],
rangeproof_segment_cache: vec![],
kernel_segment_cache: vec![],
bitmap_mmr_leaf_count: 0,
bitmap_mmr_size: 0,
max_cached_segments: pibd_params::MAX_CACHED_SEGMENTS,
bitmap_cache: None,
all_segments_complete: false,
latest_block_height: 0,
};
retval.calc_bitmap_mmr_sizes();
retval
}
pub fn reset(&mut self) {
self.all_segments_complete = false;
self.bitmap_segment_cache = vec![];
self.output_segment_cache = vec![];
self.rangeproof_segment_cache = vec![];
self.kernel_segment_cache = vec![];
self.bitmap_mmr_leaf_count = 0;
self.bitmap_mmr_size = 0;
self.bitmap_cache = None;
self.bitmap_accumulator = BitmapAccumulator::new();
self.latest_block_height = 0;
self.calc_bitmap_mmr_sizes();
}
pub fn header(&self) -> &BlockHeader {
&self.archive_header
}
pub fn expected_bitmap_mmr_size(&self) -> u64 {
self.bitmap_mmr_size
}
pub fn is_complete(&self) -> bool {
self.all_segments_complete
}
pub fn check_progress(&mut self, status: Arc<SyncState>) -> Result<bool, Error> {
let local_output_mmr_size;
let local_kernel_mmr_size;
let local_rangeproof_mmr_size;
{
let txhashset = self.txhashset.read();
local_output_mmr_size = txhashset.output_mmr_size();
local_kernel_mmr_size = txhashset.kernel_mmr_size();
local_rangeproof_mmr_size = txhashset.rangeproof_mmr_size();
}
let completed_leaves = pmmr::n_leaves(local_output_mmr_size)
+ pmmr::n_leaves(local_rangeproof_mmr_size)
+ pmmr::n_leaves(local_kernel_mmr_size);
let latest_output_size = std::cmp::min(local_output_mmr_size, local_rangeproof_mmr_size);
let res = {
let header_pmmr = self.header_pmmr.read();
header_pmmr.get_first_header_with(
latest_output_size,
local_kernel_mmr_size,
self.latest_block_height,
self.store.clone(),
)
};
if let Some(h) = res {
self.latest_block_height = h.height;
let tip = Tip::from_header(&h);
let batch = self.store.batch()?;
batch.save_pibd_head(&tip)?;
batch.commit()?;
status.update_pibd_progress(
false,
false,
completed_leaves,
self.latest_block_height,
&self.archive_header,
);
if local_kernel_mmr_size == self.archive_header.kernel_mmr_size
&& local_output_mmr_size == self.archive_header.output_mmr_size
&& local_rangeproof_mmr_size == self.archive_header.output_mmr_size
&& self.bitmap_cache.is_some()
{
return Ok(true);
}
}
Ok(false)
}
pub fn check_update_leaf_set_state(&self) -> Result<(), Error> {
let mut header_pmmr = self.header_pmmr.write();
let mut txhashset = self.txhashset.write();
let mut _batch = self.store.batch()?;
txhashset::extending(&mut header_pmmr, &mut txhashset, &mut _batch, |ext, _| {
let extension = &mut ext.extension;
if let Some(b) = &self.bitmap_cache {
extension.update_leaf_sets(&b)?;
}
Ok(())
})?;
Ok(())
}
pub fn validate_complete_state(
&self,
status: Arc<SyncState>,
stop_state: Arc<StopState>,
) -> Result<(), Error> {
{
let txhashset = self.txhashset.read();
txhashset.roots()?.validate(&self.archive_header)?;
}
let last_rangeproof_validation_pos = 0;
{
debug!("desegmenter validation: rewinding and validating kernel history (readonly)");
let txhashset = self.txhashset.read();
let mut count = 0;
let mut current = self.archive_header.clone();
let total = current.height;
txhashset::rewindable_kernel_view(&txhashset, |view, batch| {
while current.height > 0 {
view.rewind(¤t)?;
view.validate_root()?;
current = batch.get_previous_header(¤t)?;
count += 1;
if current.height % 100000 == 0 || current.height == total {
status.on_setup(Some(total - current.height), Some(total), None, None);
}
if stop_state.is_stopped() {
return Ok(());
}
}
Ok(())
})?;
debug!(
"desegmenter validation: validated kernel root on {} headers",
count,
);
}
if stop_state.is_stopped() {
return Ok(());
}
{
let header_pmmr = self.header_pmmr.read();
let txhashset = self.txhashset.read();
let batch = self.store.batch()?;
txhashset.verify_kernel_pos_index(
&self.genesis,
&header_pmmr,
&batch,
Some(status.clone()),
Some(stop_state.clone()),
)?;
}
if stop_state.is_stopped() {
return Ok(());
}
status.on_setup(None, None, None, None);
{
debug!("desegmenter validation: rewinding a 2nd time (writeable)");
let mut header_pmmr = self.header_pmmr.write();
let mut txhashset = self.txhashset.write();
let mut batch = self.store.batch()?;
txhashset::extending(
&mut header_pmmr,
&mut txhashset,
&mut batch,
|ext, batch| {
let extension = &mut ext.extension;
extension.rewind(&self.archive_header, batch)?;
let (utxo_sum, kernel_sum) = extension.validate(
&self.genesis,
false,
&*status,
Some(last_rangeproof_validation_pos),
None,
&self.archive_header,
Some(stop_state.clone()),
)?;
if stop_state.is_stopped() {
return Ok(());
}
batch.save_block_sums(
&self.archive_header.hash(),
BlockSums {
utxo_sum,
kernel_sum,
},
)?;
Ok(())
},
)?;
if stop_state.is_stopped() {
return Ok(());
}
debug!("desegmenter_validation: finished validating and rebuilding");
status.on_save();
{
let tip = Tip::from_header(&self.archive_header);
batch.save_body_head(&tip)?;
batch.save_body_tail(&tip)?;
}
txhashset.init_output_pos_index(&header_pmmr, &batch)?;
txhashset.init_recent_kernel_pos_index(&header_pmmr, &batch)?;
batch.commit()?;
debug!("desegmenter_validation: finished committing the batch (head etc.)");
status.on_done();
}
Ok(())
}
pub fn apply_next_segments(&mut self) -> Result<(), Error> {
let next_bmp_idx = self.next_required_bitmap_segment_index();
if let Some(bmp_idx) = next_bmp_idx {
if let Some((idx, _seg)) = self
.bitmap_segment_cache
.iter()
.enumerate()
.find(|s| s.1.identifier().idx == bmp_idx)
{
self.apply_bitmap_segment(idx)?;
} else {
debug!(
"desegmenter: waiting for bitmap segment idx {} (cache size {})",
bmp_idx,
self.bitmap_segment_cache.len()
);
}
} else {
if self.bitmap_cache == None {
self.finalize_bitmap()?;
}
if let Some(next_output_idx) = self.next_required_output_segment_index() {
let segments = Self::take_segment_batch(
&mut self.output_segment_cache,
next_output_idx,
self.segment_apply_batch_size,
);
if segments.is_empty() {
debug!(
"desegmenter: waiting for output segment idx {} (cache size {})",
next_output_idx,
self.output_segment_cache.len()
);
} else {
self.apply_output_segments(segments)?;
}
} else if self.output_segment_cache.len() >= self.max_cached_segments {
debug!(
"desegmenter: dropping {} cached output segments waiting for next requirement",
self.output_segment_cache.len()
);
self.output_segment_cache = vec![];
}
if let Some(next_rp_idx) = self.next_required_rangeproof_segment_index() {
let segments = Self::take_segment_batch(
&mut self.rangeproof_segment_cache,
next_rp_idx,
self.segment_apply_batch_size,
);
if segments.is_empty() {
debug!(
"desegmenter: waiting for rangeproof segment idx {} (cache size {})",
next_rp_idx,
self.rangeproof_segment_cache.len()
);
} else {
self.apply_rangeproof_segments(segments)?;
}
} else if self.rangeproof_segment_cache.len() >= self.max_cached_segments {
debug!(
"desegmenter: dropping {} cached rangeproof segments waiting for next requirement",
self.rangeproof_segment_cache.len()
);
self.rangeproof_segment_cache = vec![];
}
if let Some(next_kernel_idx) = self.next_required_kernel_segment_index() {
let segments = Self::take_segment_batch(
&mut self.kernel_segment_cache,
next_kernel_idx,
self.segment_apply_batch_size,
);
if segments.is_empty() {
debug!(
"desegmenter: waiting for kernel segment idx {} (cache size {})",
next_kernel_idx,
self.kernel_segment_cache.len()
);
} else {
self.apply_kernel_segments(segments)?;
}
} else if self.kernel_segment_cache.len() >= self.max_cached_segments {
debug!(
"desegmenter: dropping {} cached kernel segments waiting for next requirement",
self.kernel_segment_cache.len()
);
self.kernel_segment_cache = vec![];
}
}
Ok(())
}
pub fn next_desired_segments(&mut self, max_elements: usize) -> Vec<SegmentTypeIdentifier> {
let mut return_vec = vec![];
if self.bitmap_cache.is_none() {
let local_pmmr_size = self.bitmap_accumulator.readonly_pmmr().unpruned_size();
let mut identifier_iter = SegmentIdentifier::traversal_iter(
self.bitmap_mmr_size,
self.default_bitmap_segment_height,
);
while let Some(id) = identifier_iter.next() {
if id.segment_pos_range(self.bitmap_mmr_size).1 > local_pmmr_size {
if !self.has_bitmap_segment_with_id(id) {
return_vec.push(SegmentTypeIdentifier::new(SegmentType::Bitmap, id));
if return_vec.len() >= max_elements {
return return_vec;
}
}
}
}
} else {
let local_output_mmr_size;
let local_kernel_mmr_size;
let local_rangeproof_mmr_size;
{
let txhashset = self.txhashset.read();
local_output_mmr_size = txhashset.output_mmr_size();
local_kernel_mmr_size = txhashset.kernel_mmr_size();
local_rangeproof_mmr_size = txhashset.rangeproof_mmr_size();
}
let total_output_segments = SegmentIdentifier::count_segments_required(
self.archive_header.output_mmr_size,
self.default_output_segment_height,
);
let mut elems_added = 0;
if let Some(mut next_output_idx) = self.next_required_output_segment_index() {
while (next_output_idx as usize) < total_output_segments {
if self.output_segment_cache.len() >= self.max_cached_segments {
break;
}
if elems_added == max_elements / 3 {
break;
}
let output_id = SegmentIdentifier {
height: self.default_output_segment_height,
idx: next_output_idx,
};
let (_first, last) =
output_id.segment_pos_range(self.archive_header.output_mmr_size);
if last > local_output_mmr_size && !self.has_output_segment_with_id(output_id) {
return_vec.push(SegmentTypeIdentifier::new(SegmentType::Output, output_id));
elems_added += 1;
}
next_output_idx += 1;
}
}
let total_rangeproof_segments = SegmentIdentifier::count_segments_required(
self.archive_header.output_mmr_size,
self.default_rangeproof_segment_height,
);
elems_added = 0;
if let Some(mut next_rp_idx) = self.next_required_rangeproof_segment_index() {
while (next_rp_idx as usize) < total_rangeproof_segments {
if self.rangeproof_segment_cache.len() >= self.max_cached_segments {
break;
}
if elems_added == max_elements / 3 {
break;
}
let rp_id = SegmentIdentifier {
height: self.default_rangeproof_segment_height,
idx: next_rp_idx,
};
let (_first, last) =
rp_id.segment_pos_range(self.archive_header.output_mmr_size);
if last > local_rangeproof_mmr_size
&& !self.has_rangeproof_segment_with_id(rp_id)
{
return_vec.push(SegmentTypeIdentifier::new(SegmentType::RangeProof, rp_id));
elems_added += 1;
}
next_rp_idx += 1;
}
}
let total_kernel_segments = SegmentIdentifier::count_segments_required(
self.archive_header.kernel_mmr_size,
self.default_kernel_segment_height,
);
elems_added = 0;
if let Some(mut next_kernel_idx) = self.next_required_kernel_segment_index() {
while (next_kernel_idx as usize) < total_kernel_segments {
if self.kernel_segment_cache.len() >= self.max_cached_segments {
break;
}
if elems_added == max_elements / 3 {
break;
}
let k_id = SegmentIdentifier {
height: self.default_kernel_segment_height,
idx: next_kernel_idx,
};
let (_first, last) =
k_id.segment_pos_range(self.archive_header.kernel_mmr_size);
if last > local_kernel_mmr_size && !self.has_kernel_segment_with_id(k_id) {
return_vec.push(SegmentTypeIdentifier::new(SegmentType::Kernel, k_id));
elems_added += 1;
}
next_kernel_idx += 1;
}
}
}
if let Some(next_kernel_idx) = self.next_required_kernel_segment_index() {
let seg_id = SegmentIdentifier {
height: self.default_kernel_segment_height,
idx: next_kernel_idx,
};
let next_kernel_seg_id = SegmentTypeIdentifier::new(SegmentType::Kernel, seg_id);
if !self.has_kernel_segment_with_id(seg_id)
&& !return_vec.iter().any(|x| x == &next_kernel_seg_id)
{
if return_vec.len() >= max_elements {
return_vec.pop();
}
return_vec.push(next_kernel_seg_id);
}
}
if return_vec.is_empty() && self.bitmap_cache.is_some() {
self.all_segments_complete = true;
}
return_vec
}
pub fn finalize_bitmap(&mut self) -> Result<(), Error> {
trace!(
"pibd_desegmenter: finalizing and caching bitmap - accumulator root: {}",
self.bitmap_accumulator.root()
);
self.bitmap_cache = Some(self.bitmap_accumulator.as_bitmap()?);
let mut header_pmmr = self.header_pmmr.write();
let mut txhashset = self.txhashset.write();
let mut batch = self.store.batch()?;
txhashset::extending(
&mut header_pmmr,
&mut txhashset,
&mut batch,
|ext, _batch| {
let extension = &mut ext.extension;
extension.set_bitmap_accumulator(self.bitmap_accumulator.clone());
Ok(())
},
)?;
Ok(())
}
fn calc_bitmap_mmr_sizes(&mut self) {
self.bitmap_mmr_leaf_count =
(pmmr::n_leaves(self.archive_header.output_mmr_size) + 1023) / 1024;
trace!(
"pibd_desegmenter - expected number of leaves in bitmap MMR: {}",
self.bitmap_mmr_leaf_count
);
self.bitmap_mmr_size =
1 + pmmr::peaks(pmmr::insertion_to_pmmr_index(self.bitmap_mmr_leaf_count))
.last()
.unwrap_or(
&(pmmr::peaks(pmmr::insertion_to_pmmr_index(
self.bitmap_mmr_leaf_count - 1,
))
.last()
.unwrap()),
)
.clone();
trace!(
"pibd_desegmenter - expected size of bitmap MMR: {}",
self.bitmap_mmr_size
);
}
fn cache_bitmap_segment(&mut self, in_seg: Segment<BitmapChunk>) {
if self
.bitmap_segment_cache
.iter()
.find(|i| i.identifier() == in_seg.identifier())
.is_none()
{
self.bitmap_segment_cache.push(in_seg);
}
}
fn has_bitmap_segment_with_id(&self, seg_id: SegmentIdentifier) -> bool {
self.bitmap_segment_cache
.iter()
.find(|i| i.identifier() == seg_id)
.is_some()
}
fn next_required_bitmap_segment_index(&self) -> Option<u64> {
let local_bitmap_pmmr_size = self.bitmap_accumulator.readonly_pmmr().unpruned_size();
let cur_segment_count = SegmentIdentifier::count_segments_required(
local_bitmap_pmmr_size,
self.default_bitmap_segment_height,
);
let total_segment_count = SegmentIdentifier::count_segments_required(
self.bitmap_mmr_size,
self.default_bitmap_segment_height,
);
if cur_segment_count == total_segment_count {
None
} else {
Some(cur_segment_count as u64)
}
}
pub fn add_bitmap_segment(
&mut self,
segment: Segment<BitmapChunk>,
output_root_hash: Hash,
) -> Result<(), Error> {
trace!("pibd_desegmenter: add bitmap segment");
segment.validate_with(
self.bitmap_mmr_size, None,
self.archive_header.output_root, self.archive_header.output_mmr_size,
output_root_hash, true,
)?;
trace!("pibd_desegmenter: adding segment to cache");
self.cache_bitmap_segment(segment);
Ok(())
}
pub fn apply_bitmap_segment(&mut self, idx: usize) -> Result<(), Error> {
let segment = self.bitmap_segment_cache.remove(idx);
trace!(
"pibd_desegmenter: apply bitmap segment at segment idx {}",
segment.identifier().idx
);
let (_sid, _hash_pos, _hashes, _leaf_pos, leaf_data, _proof) = segment.parts();
for chunk in leaf_data.into_iter() {
self.bitmap_accumulator.append_chunk(chunk)?;
}
Ok(())
}
fn has_output_segment_with_id(&self, seg_id: SegmentIdentifier) -> bool {
self.output_segment_cache
.iter()
.find(|i| i.identifier() == seg_id)
.is_some()
}
fn cache_output_segment(&mut self, in_seg: Segment<OutputIdentifier>) {
if self
.output_segment_cache
.iter()
.find(|i| i.identifier() == in_seg.identifier())
.is_none()
{
self.output_segment_cache.push(in_seg);
}
}
fn take_segment_batch<T>(
cache: &mut Vec<Segment<T>>,
start_idx: u64,
max_segments: usize,
) -> Vec<Segment<T>> {
let mut result = Vec::new();
let mut next_idx = start_idx;
while result.len() < max_segments {
if let Some(pos) = cache.iter().position(|s| s.identifier().idx == next_idx) {
result.push(cache.remove(pos));
next_idx += 1;
} else {
break;
}
}
result
}
pub fn apply_output_segments(
&mut self,
segments: Vec<Segment<OutputIdentifier>>,
) -> Result<(), Error> {
if segments.is_empty() {
return Ok(());
}
trace!(
"pibd_desegmenter: applying {} output segment(s) starting at idx {}",
segments.len(),
segments.first().map(|s| s.identifier().idx).unwrap_or(0)
);
let mut header_pmmr = self.header_pmmr.write();
let mut txhashset = self.txhashset.write();
let mut batch = self.store.batch()?;
txhashset::extending(
&mut header_pmmr,
&mut txhashset,
&mut batch,
|ext, _batch| {
let extension = &mut ext.extension;
for segment in segments {
extension.apply_output_segment(segment)?;
}
Ok(())
},
)?;
Ok(())
}
fn next_required_output_segment_index(&self) -> Option<u64> {
let local_output_mmr_size;
{
let txhashset = self.txhashset.read();
local_output_mmr_size = txhashset.output_mmr_size();
}
let mut cur_segment_count = if local_output_mmr_size == 1 {
0
} else {
SegmentIdentifier::count_segments_required(
local_output_mmr_size,
self.default_output_segment_height,
)
};
let theoretical_pmmr_size =
SegmentIdentifier::pmmr_size(cur_segment_count, self.default_output_segment_height);
if local_output_mmr_size < theoretical_pmmr_size {
cur_segment_count -= 1;
}
let total_segment_count = SegmentIdentifier::count_segments_required(
self.archive_header.output_mmr_size,
self.default_output_segment_height,
);
trace!(
"Next required output segment is {} of {}",
cur_segment_count,
total_segment_count
);
if cur_segment_count == total_segment_count {
None
} else {
Some(cur_segment_count as u64)
}
}
pub fn add_output_segment(
&mut self,
segment: Segment<OutputIdentifier>,
_bitmap_root: Option<Hash>,
) -> Result<(), Error> {
trace!("pibd_desegmenter: add output segment");
segment.validate_with(
self.archive_header.output_mmr_size, self.bitmap_cache.as_ref(),
self.archive_header.output_root, self.archive_header.output_mmr_size,
self.bitmap_accumulator.root(), false,
)?;
self.cache_output_segment(segment);
Ok(())
}
fn has_rangeproof_segment_with_id(&self, seg_id: SegmentIdentifier) -> bool {
self.rangeproof_segment_cache
.iter()
.find(|i| i.identifier() == seg_id)
.is_some()
}
fn cache_rangeproof_segment(&mut self, in_seg: Segment<RangeProof>) {
if self
.rangeproof_segment_cache
.iter()
.find(|i| i.identifier() == in_seg.identifier())
.is_none()
{
self.rangeproof_segment_cache.push(in_seg);
}
}
pub fn apply_rangeproof_segments(
&mut self,
segments: Vec<Segment<RangeProof>>,
) -> Result<(), Error> {
if segments.is_empty() {
return Ok(());
}
trace!(
"pibd_desegmenter: applying {} rangeproof segment(s) starting at idx {}",
segments.len(),
segments.first().map(|s| s.identifier().idx).unwrap_or(0)
);
let mut header_pmmr = self.header_pmmr.write();
let mut txhashset = self.txhashset.write();
let mut batch = self.store.batch()?;
txhashset::extending(
&mut header_pmmr,
&mut txhashset,
&mut batch,
|ext, _batch| {
let extension = &mut ext.extension;
for segment in segments {
extension.apply_rangeproof_segment(segment)?;
}
Ok(())
},
)?;
Ok(())
}
fn next_required_rangeproof_segment_index(&self) -> Option<u64> {
let local_rangeproof_mmr_size;
{
let txhashset = self.txhashset.read();
local_rangeproof_mmr_size = txhashset.rangeproof_mmr_size();
}
let mut cur_segment_count = if local_rangeproof_mmr_size == 1 {
0
} else {
SegmentIdentifier::count_segments_required(
local_rangeproof_mmr_size,
self.default_rangeproof_segment_height,
)
};
let theoretical_pmmr_size =
SegmentIdentifier::pmmr_size(cur_segment_count, self.default_rangeproof_segment_height);
if local_rangeproof_mmr_size < theoretical_pmmr_size {
cur_segment_count -= 1;
}
let total_segment_count = SegmentIdentifier::count_segments_required(
self.archive_header.output_mmr_size,
self.default_rangeproof_segment_height,
);
trace!(
"Next required rangeproof segment is {} of {}",
cur_segment_count,
total_segment_count
);
if cur_segment_count == total_segment_count {
None
} else {
Some(cur_segment_count as u64)
}
}
pub fn add_rangeproof_segment(&mut self, segment: Segment<RangeProof>) -> Result<(), Error> {
trace!("pibd_desegmenter: add rangeproof segment");
segment.validate(
self.archive_header.output_mmr_size, self.bitmap_cache.as_ref(),
self.archive_header.range_proof_root, )?;
self.cache_rangeproof_segment(segment);
Ok(())
}
fn has_kernel_segment_with_id(&self, seg_id: SegmentIdentifier) -> bool {
self.kernel_segment_cache
.iter()
.find(|i| i.identifier() == seg_id)
.is_some()
}
fn cache_kernel_segment(&mut self, in_seg: Segment<TxKernel>) {
if self
.kernel_segment_cache
.iter()
.find(|i| i.identifier() == in_seg.identifier())
.is_none()
{
self.kernel_segment_cache.push(in_seg);
}
}
pub fn apply_kernel_segments(&mut self, segments: Vec<Segment<TxKernel>>) -> Result<(), Error> {
if segments.is_empty() {
return Ok(());
}
let first_idx = segments.first().map(|s| s.identifier().idx).unwrap_or(0);
debug!(
"pibd_desegmenter: applying {} kernel segment(s) starting at idx {}",
segments.len(),
first_idx
);
let mut header_pmmr = self.header_pmmr.write();
let mut txhashset = self.txhashset.write();
let mut batch = self.store.batch()?;
txhashset::extending(
&mut header_pmmr,
&mut txhashset,
&mut batch,
|ext, _batch| {
let extension = &mut ext.extension;
for segment in segments {
let seg_idx = segment.identifier().idx;
if let Err(e) = extension.apply_kernel_segment(segment) {
error!(
"pibd_desegmenter: failed to apply kernel segment idx {}: {}",
seg_idx, e
);
return Err(e);
}
debug!(
"pibd_desegmenter: successfully applied kernel segment idx {}",
seg_idx
);
}
Ok(())
},
)?;
Ok(())
}
fn next_required_kernel_segment_index(&self) -> Option<u64> {
let local_kernel_mmr_size;
{
let txhashset = self.txhashset.read();
local_kernel_mmr_size = txhashset.kernel_mmr_size();
}
let mut cur_segment_count = if local_kernel_mmr_size == 1 {
0
} else {
SegmentIdentifier::count_segments_required(
local_kernel_mmr_size,
self.default_kernel_segment_height,
)
};
let theoretical_pmmr_size =
SegmentIdentifier::pmmr_size(cur_segment_count, self.default_kernel_segment_height);
if local_kernel_mmr_size < theoretical_pmmr_size {
cur_segment_count -= 1;
}
let total_segment_count = SegmentIdentifier::count_segments_required(
self.archive_header.kernel_mmr_size,
self.default_kernel_segment_height,
);
trace!(
"Next required kernel segment is {} of {}",
cur_segment_count,
total_segment_count
);
if cur_segment_count == total_segment_count {
None
} else {
Some(cur_segment_count as u64)
}
}
pub fn add_kernel_segment(&mut self, segment: Segment<TxKernel>) -> Result<(), Error> {
let idx = segment.identifier().idx;
debug!(
"pibd_desegmenter: received kernel segment idx {} (cache size {})",
idx,
self.kernel_segment_cache.len()
);
segment
.validate(
self.archive_header.kernel_mmr_size, None,
self.archive_header.kernel_root, )
.map_err(|e| {
error!(
"pibd_desegmenter: kernel segment idx {} failed validation: {}",
idx, e
);
e
})?;
self.cache_kernel_segment(segment);
debug!(
"pibd_desegmenter: cached kernel segment idx {} (cache size {})",
idx,
self.kernel_segment_cache.len()
);
Ok(())
}
}