use crate::blocktree_db::{
columns as cf, Column, Database, IteratorDirection, IteratorMode, LedgerColumn, WriteBatch,
};
pub use crate::blocktree_db::{BlocktreeError, Result};
pub use crate::blocktree_meta::SlotMeta;
use crate::blocktree_meta::*;
use crate::entry::{create_ticks, Entry};
use crate::erasure::ErasureConfig;
use crate::leader_schedule_cache::LeaderScheduleCache;
use crate::shred::{Shred, Shredder};
use bincode::deserialize;
use log::*;
use rayon::iter::IntoParallelRefIterator;
use rayon::iter::ParallelIterator;
use rayon::ThreadPool;
use rocksdb::DBRawIterator;
use solana_measure::measure::Measure;
use solana_metrics::{datapoint_debug, datapoint_error};
use solana_rayon_threadlimit::get_thread_count;
use solana_sdk::clock::Slot;
use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::hash::Hash;
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::cell::RefCell;
use std::cmp;
use std::collections::HashMap;
use std::fs;
use std::path::{Path, PathBuf};
use std::rc::Rc;
use std::sync::mpsc::{sync_channel, Receiver, SyncSender, TrySendError};
use std::sync::{Arc, Mutex, RwLock};
pub const BLOCKTREE_DIRECTORY: &str = "rocksdb";
thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
.num_threads(get_thread_count())
.build()
.unwrap()));
pub const MAX_COMPLETED_SLOTS_IN_CHANNEL: usize = 100_000;
pub type CompletedSlotsReceiver = Receiver<Vec<u64>>;
pub struct Blocktree {
db: Arc<Database>,
meta_cf: LedgerColumn<cf::SlotMeta>,
dead_slots_cf: LedgerColumn<cf::DeadSlots>,
erasure_meta_cf: LedgerColumn<cf::ErasureMeta>,
orphans_cf: LedgerColumn<cf::Orphans>,
index_cf: LedgerColumn<cf::Index>,
data_shred_cf: LedgerColumn<cf::ShredData>,
code_shred_cf: LedgerColumn<cf::ShredCode>,
last_root: Arc<RwLock<u64>>,
insert_shreds_lock: Arc<Mutex<()>>,
pub new_shreds_signals: Vec<SyncSender<bool>>,
pub completed_slots_senders: Vec<SyncSender<Vec<u64>>>,
}
pub struct IndexMetaWorkingSetEntry {
index: Index,
did_insert_occur: bool,
}
pub struct SlotMetaWorkingSetEntry {
new_slot_meta: Rc<RefCell<SlotMeta>>,
old_slot_meta: Option<SlotMeta>,
did_insert_occur: bool,
}
pub struct BlocktreeInsertionMetrics {
pub num_shreds: usize,
pub insert_lock_elapsed: u64,
pub insert_shreds_elapsed: u64,
pub shred_recovery_elapsed: u64,
pub chaining_elapsed: u64,
pub commit_working_sets_elapsed: u64,
pub write_batch_elapsed: u64,
pub total_elapsed: u64,
pub num_inserted: u64,
pub num_recovered: usize,
pub index_meta_time: u64,
}
impl SlotMetaWorkingSetEntry {
fn new(new_slot_meta: Rc<RefCell<SlotMeta>>, old_slot_meta: Option<SlotMeta>) -> Self {
Self {
new_slot_meta,
old_slot_meta,
did_insert_occur: false,
}
}
}
impl BlocktreeInsertionMetrics {
pub fn report_metrics(&self, metric_name: &'static str) {
datapoint_debug!(
metric_name,
("num_shreds", self.num_shreds as i64, i64),
("total_elapsed", self.total_elapsed as i64, i64),
("insert_lock_elapsed", self.insert_lock_elapsed as i64, i64),
(
"insert_shreds_elapsed",
self.insert_shreds_elapsed as i64,
i64
),
(
"shred_recovery_elapsed",
self.shred_recovery_elapsed as i64,
i64
),
("chaining_elapsed", self.chaining_elapsed as i64, i64),
(
"commit_working_sets_elapsed",
self.commit_working_sets_elapsed as i64,
i64
),
("write_batch_elapsed", self.write_batch_elapsed as i64, i64),
("num_inserted", self.num_inserted as i64, i64),
("num_recovered", self.num_recovered as i64, i64),
);
}
}
impl Blocktree {
pub fn open(ledger_path: &Path) -> Result<Blocktree> {
fs::create_dir_all(&ledger_path)?;
let blocktree_path = ledger_path.join(BLOCKTREE_DIRECTORY);
let db = Database::open(&blocktree_path)?;
let meta_cf = db.column();
let dead_slots_cf = db.column();
let erasure_meta_cf = db.column();
let orphans_cf = db.column();
let index_cf = db.column();
let data_shred_cf = db.column();
let code_shred_cf = db.column();
let db = Arc::new(db);
let max_root = db
.iter::<cf::Root>(IteratorMode::End)?
.next()
.map(|(slot, _)| slot)
.unwrap_or(0);
let last_root = Arc::new(RwLock::new(max_root));
Ok(Blocktree {
db,
meta_cf,
dead_slots_cf,
erasure_meta_cf,
orphans_cf,
index_cf,
data_shred_cf,
code_shred_cf,
new_shreds_signals: vec![],
completed_slots_senders: vec![],
insert_shreds_lock: Arc::new(Mutex::new(())),
last_root,
})
}
pub fn open_with_signal(
ledger_path: &Path,
) -> Result<(Self, Receiver<bool>, CompletedSlotsReceiver)> {
let mut blocktree = Self::open(ledger_path)?;
let (signal_sender, signal_receiver) = sync_channel(1);
let (completed_slots_sender, completed_slots_receiver) =
sync_channel(MAX_COMPLETED_SLOTS_IN_CHANNEL);
blocktree.new_shreds_signals = vec![signal_sender];
blocktree.completed_slots_senders = vec![completed_slots_sender];
Ok((blocktree, signal_receiver, completed_slots_receiver))
}
pub fn destroy(ledger_path: &Path) -> Result<()> {
fs::create_dir_all(ledger_path)?;
let blocktree_path = ledger_path.join(BLOCKTREE_DIRECTORY);
Database::destroy(&blocktree_path)
}
pub fn meta(&self, slot: u64) -> Result<Option<SlotMeta>> {
self.meta_cf.get(slot)
}
pub fn is_full(&self, slot: u64) -> bool {
if let Ok(meta) = self.meta_cf.get(slot) {
if let Some(meta) = meta {
return meta.is_full();
}
}
false
}
pub fn purge_slots(&self, mut from_slot: Slot, to_slot: Option<Slot>) {
const PURGE_BATCH_SIZE: u64 = 1000;
let mut batch_end = to_slot
.unwrap_or(from_slot + PURGE_BATCH_SIZE)
.min(from_slot + PURGE_BATCH_SIZE);
while from_slot < batch_end {
if let Ok(end) = self.run_purge_batch(from_slot, batch_end) {
if end {
break;
} else {
from_slot = batch_end;
batch_end = to_slot
.unwrap_or(batch_end + PURGE_BATCH_SIZE)
.min(batch_end + PURGE_BATCH_SIZE);
}
}
}
}
fn run_purge_batch(&self, from_slot: Slot, batch_end: Slot) -> Result<bool> {
let from_slot = Some(from_slot);
let batch_end = Some(batch_end);
let mut write_batch = self
.db
.batch()
.expect("Database Error: Failed to get write batch");
let end = self
.meta_cf
.delete_slot(&mut write_batch, from_slot, batch_end)
.unwrap_or(false)
&& self
.erasure_meta_cf
.delete_slot(&mut write_batch, from_slot, batch_end)
.unwrap_or(false)
&& self
.data_shred_cf
.delete_slot(&mut write_batch, from_slot, batch_end)
.unwrap_or(false)
&& self
.code_shred_cf
.delete_slot(&mut write_batch, from_slot, batch_end)
.unwrap_or(false)
&& self
.orphans_cf
.delete_slot(&mut write_batch, from_slot, batch_end)
.unwrap_or(false)
&& self
.index_cf
.delete_slot(&mut write_batch, from_slot, batch_end)
.unwrap_or(false)
&& self
.dead_slots_cf
.delete_slot(&mut write_batch, from_slot, batch_end)
.unwrap_or(false)
&& self
.db
.column::<cf::Root>()
.delete_slot(&mut write_batch, from_slot, batch_end)
.unwrap_or(false);
if let Err(e) = self.db.write(write_batch) {
error!(
"Error: {:?} while submitting write batch for slot {:?} retrying...",
e, from_slot
);
return Err(e);
}
Ok(end)
}
pub fn erasure_meta(&self, slot: u64, set_index: u64) -> Result<Option<ErasureMeta>> {
self.erasure_meta_cf.get((slot, set_index))
}
pub fn orphan(&self, slot: u64) -> Result<Option<bool>> {
self.orphans_cf.get(slot)
}
pub fn slot_meta_iterator<'a>(
&'a self,
slot: u64,
) -> Result<impl Iterator<Item = (u64, SlotMeta)> + 'a> {
let meta_iter = self
.db
.iter::<cf::SlotMeta>(IteratorMode::From(slot, IteratorDirection::Forward))?;
Ok(meta_iter.map(|(slot, slot_meta_bytes)| {
(
slot,
deserialize(&slot_meta_bytes)
.unwrap_or_else(|_| panic!("Could not deserialize SlotMeta for slot {}", slot)),
)
}))
}
pub fn slot_data_iterator<'a>(
&'a self,
slot: u64,
) -> Result<impl Iterator<Item = ((u64, u64), Box<[u8]>)> + 'a> {
let slot_iterator = self
.db
.iter::<cf::ShredData>(IteratorMode::From((slot, 0), IteratorDirection::Forward))?;
Ok(slot_iterator.take_while(move |((shred_slot, _), _)| *shred_slot == slot))
}
fn try_shred_recovery(
db: &Database,
erasure_metas: &HashMap<(u64, u64), ErasureMeta>,
index_working_set: &HashMap<u64, IndexMetaWorkingSetEntry>,
prev_inserted_datas: &mut HashMap<(u64, u64), Shred>,
prev_inserted_codes: &mut HashMap<(u64, u64), Shred>,
) -> Vec<Shred> {
let data_cf = db.column::<cf::ShredData>();
let code_cf = db.column::<cf::ShredCode>();
let mut recovered_data_shreds = vec![];
for (&(slot, set_index), erasure_meta) in erasure_metas.iter() {
let submit_metrics = |attempted: bool, status: String, recovered: usize| {
datapoint_debug!(
"blocktree-erasure",
("slot", slot as i64, i64),
("start_index", set_index as i64, i64),
("end_index", erasure_meta.end_indexes().0 as i64, i64),
("recovery_attempted", attempted, bool),
("recovery_status", status, String),
("recovered", recovered as i64, i64),
);
};
let index_meta_entry = index_working_set.get(&slot).expect("Index");
let index = &index_meta_entry.index;
match erasure_meta.status(&index) {
ErasureMetaStatus::CanRecover => {
let slot = index.slot;
let mut available_shreds = vec![];
(set_index..set_index + erasure_meta.config.num_data() as u64).for_each(|i| {
if index.data().is_present(i) {
if let Some(shred) =
prev_inserted_datas.remove(&(slot, i)).or_else(|| {
let some_data = data_cf
.get_bytes((slot, i))
.expect("Database failure, could not fetch data shred");
if let Some(data) = some_data {
Shred::new_from_serialized_shred(data).ok()
} else {
warn!("Data shred deleted while reading for recovery");
None
}
})
{
available_shreds.push(shred);
}
}
});
(set_index..set_index + erasure_meta.config.num_coding() as u64).for_each(
|i| {
if index.coding().is_present(i) {
if let Some(shred) =
prev_inserted_codes.remove(&(slot, i)).or_else(|| {
let some_code = code_cf
.get_bytes((slot, i))
.expect("Database failure, could not fetch code shred");
if let Some(code) = some_code {
Shred::new_from_serialized_shred(code).ok()
} else {
warn!("Code shred deleted while reading for recovery");
None
}
})
{
available_shreds.push(shred);
}
}
},
);
if let Ok(mut result) = Shredder::try_recovery(
available_shreds,
erasure_meta.config.num_data(),
erasure_meta.config.num_coding(),
set_index as usize,
slot,
) {
submit_metrics(true, "complete".into(), result.len());
recovered_data_shreds.append(&mut result);
} else {
submit_metrics(true, "incomplete".into(), 0);
}
}
ErasureMetaStatus::DataFull => {
submit_metrics(false, "complete".into(), 0);
}
ErasureMetaStatus::StillNeed(needed) => {
submit_metrics(false, format!("still need: {}", needed), 0);
}
};
}
recovered_data_shreds
}
pub fn insert_shreds(
&self,
shreds: Vec<Shred>,
leader_schedule: Option<&Arc<LeaderScheduleCache>>,
) -> Result<BlocktreeInsertionMetrics> {
let mut total_start = Measure::start("Total elapsed");
let mut start = Measure::start("Blocktree lock");
let _lock = self.insert_shreds_lock.lock().unwrap();
start.stop();
let insert_lock_elapsed = start.as_us();
let db = &*self.db;
let mut write_batch = db.batch()?;
let mut just_inserted_coding_shreds = HashMap::new();
let mut just_inserted_data_shreds = HashMap::new();
let mut erasure_metas = HashMap::new();
let mut slot_meta_working_set = HashMap::new();
let mut index_working_set = HashMap::new();
let num_shreds = shreds.len();
let mut start = Measure::start("Shred insertion");
let mut num_inserted = 0;
let mut index_meta_time = 0;
shreds.into_iter().for_each(|shred| {
let insert_success = {
if shred.is_data() {
self.check_insert_data_shred(
shred,
&mut index_working_set,
&mut slot_meta_working_set,
&mut write_batch,
&mut just_inserted_data_shreds,
&mut index_meta_time,
)
} else if shred.is_code() {
self.check_insert_coding_shred(
shred,
&mut erasure_metas,
&mut index_working_set,
&mut write_batch,
&mut just_inserted_coding_shreds,
&mut index_meta_time,
)
} else {
panic!("There should be no other case");
}
};
if insert_success {
num_inserted += 1;
}
});
start.stop();
let insert_shreds_elapsed = start.as_us();
let mut start = Measure::start("Shred recovery");
let mut num_recovered = 0;
if let Some(leader_schedule_cache) = leader_schedule {
let recovered_data = Self::try_shred_recovery(
&db,
&erasure_metas,
&index_working_set,
&mut just_inserted_data_shreds,
&mut just_inserted_coding_shreds,
);
num_recovered = recovered_data.len();
recovered_data.into_iter().for_each(|shred| {
if let Some(leader) = leader_schedule_cache.slot_leader_at(shred.slot(), None) {
if shred.verify(&leader) {
self.check_insert_data_shred(
shred,
&mut index_working_set,
&mut slot_meta_working_set,
&mut write_batch,
&mut just_inserted_coding_shreds,
&mut index_meta_time,
);
}
}
});
}
start.stop();
let shred_recovery_elapsed = start.as_us();
let mut start = Measure::start("Shred recovery");
handle_chaining(&self.db, &mut write_batch, &mut slot_meta_working_set)?;
start.stop();
let chaining_elapsed = start.as_us();
let mut start = Measure::start("Commit Working Sets");
let (should_signal, newly_completed_slots) = commit_slot_meta_working_set(
&slot_meta_working_set,
&self.completed_slots_senders,
&mut write_batch,
)?;
for ((slot, set_index), erasure_meta) in erasure_metas {
write_batch.put::<cf::ErasureMeta>((slot, set_index), &erasure_meta)?;
}
for (&slot, index_working_set_entry) in index_working_set.iter() {
if index_working_set_entry.did_insert_occur {
write_batch.put::<cf::Index>(slot, &index_working_set_entry.index)?;
}
}
start.stop();
let commit_working_sets_elapsed = start.as_us();
let mut start = Measure::start("Write Batch");
self.db.write(write_batch)?;
start.stop();
let write_batch_elapsed = start.as_us();
if should_signal {
for signal in &self.new_shreds_signals {
let _ = signal.try_send(true);
}
}
send_signals(
&self.new_shreds_signals,
&self.completed_slots_senders,
should_signal,
newly_completed_slots,
)?;
total_start.stop();
Ok(BlocktreeInsertionMetrics {
num_shreds,
total_elapsed: total_start.as_us(),
insert_lock_elapsed,
insert_shreds_elapsed,
shred_recovery_elapsed,
chaining_elapsed,
commit_working_sets_elapsed,
write_batch_elapsed,
num_inserted,
num_recovered,
index_meta_time,
})
}
fn check_insert_coding_shred(
&self,
shred: Shred,
erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>,
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
write_batch: &mut WriteBatch,
just_inserted_coding_shreds: &mut HashMap<(u64, u64), Shred>,
index_meta_time: &mut u64,
) -> bool {
let slot = shred.slot();
let shred_index = u64::from(shred.index());
let index_meta_working_set_entry =
get_index_meta_entry(&self.db, slot, index_working_set, index_meta_time);
let index_meta = &mut index_meta_working_set_entry.index;
if Blocktree::should_insert_coding_shred(&shred, index_meta.coding(), &self.last_root) {
self.insert_coding_shred(erasure_metas, index_meta, &shred, write_batch)
.map(|_| {
just_inserted_coding_shreds
.entry((slot, shred_index))
.or_insert_with(|| shred);
index_meta_working_set_entry.did_insert_occur = true;
})
.is_ok()
} else {
false
}
}
fn check_insert_data_shred(
&self,
shred: Shred,
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
slot_meta_working_set: &mut HashMap<u64, SlotMetaWorkingSetEntry>,
write_batch: &mut WriteBatch,
just_inserted_data_shreds: &mut HashMap<(u64, u64), Shred>,
index_meta_time: &mut u64,
) -> bool {
let slot = shred.slot();
let shred_index = u64::from(shred.index());
let index_meta_working_set_entry =
get_index_meta_entry(&self.db, slot, index_working_set, index_meta_time);
let index_meta = &mut index_meta_working_set_entry.index;
let slot_meta_entry =
get_slot_meta_entry(&self.db, slot_meta_working_set, slot, shred.parent());
let slot_meta = &mut slot_meta_entry.new_slot_meta.borrow_mut();
if Blocktree::should_insert_data_shred(
&shred,
slot_meta,
index_meta.data(),
&self.last_root,
) {
if let Ok(()) =
self.insert_data_shred(slot_meta, index_meta.data_mut(), &shred, write_batch)
{
just_inserted_data_shreds.insert((slot, shred_index), shred);
index_meta_working_set_entry.did_insert_occur = true;
slot_meta_entry.did_insert_occur = true;
true
} else {
false
}
} else {
false
}
}
fn should_insert_coding_shred(
shred: &Shred,
coding_index: &CodingIndex,
last_root: &RwLock<u64>,
) -> bool {
let slot = shred.slot();
let shred_index = shred.index();
if shred.is_data() || shred_index < u32::from(shred.coding_header.position) {
return false;
}
let set_index = shred_index - u32::from(shred.coding_header.position);
!(shred.coding_header.num_coding_shreds == 0
|| shred.coding_header.position >= shred.coding_header.num_coding_shreds
|| std::u32::MAX - set_index < u32::from(shred.coding_header.num_coding_shreds) - 1
|| coding_index.is_present(u64::from(shred_index))
|| slot <= *last_root.read().unwrap())
}
fn insert_coding_shred(
&self,
erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>,
index_meta: &mut Index,
shred: &Shred,
write_batch: &mut WriteBatch,
) -> Result<()> {
let slot = shred.slot();
let shred_index = u64::from(shred.index());
if shred.is_data() || shred_index < u64::from(shred.coding_header.position) {
error!("Due to earlier validation, shred index must be >= pos");
return Err(BlocktreeError::InvalidShredData(Box::new(
bincode::ErrorKind::Custom("shred index < pos".to_string()),
)));
}
let set_index = shred_index - u64::from(shred.coding_header.position);
let erasure_config = ErasureConfig::new(
shred.coding_header.num_data_shreds as usize,
shred.coding_header.num_coding_shreds as usize,
);
let erasure_meta = erasure_metas.entry((slot, set_index)).or_insert_with(|| {
self.erasure_meta_cf
.get((slot, set_index))
.expect("Expect database get to succeed")
.unwrap_or_else(|| ErasureMeta::new(set_index, &erasure_config))
});
if erasure_config != erasure_meta.config {
warn!("Received multiple erasure configs for the same erasure set!!!");
warn!(
"Stored config: {:#?}, new config: {:#?}",
erasure_meta.config, erasure_config
);
}
write_batch.put_bytes::<cf::ShredCode>((slot, shred_index), &shred.payload)?;
index_meta.coding_mut().set_present(shred_index, true);
Ok(())
}
fn should_insert_data_shred(
shred: &Shred,
slot_meta: &SlotMeta,
data_index: &DataIndex,
last_root: &RwLock<u64>,
) -> bool {
let shred_index = u64::from(shred.index());
let slot = shred.slot();
let last_in_slot = if shred.last_in_slot() {
debug!("got last in slot");
true
} else {
false
};
if shred_index < slot_meta.consumed || data_index.is_present(shred_index) {
return false;
}
let last_index = slot_meta.last_index;
if shred_index >= last_index {
datapoint_error!(
"blocktree_error",
(
"error",
format!(
"Received index {} >= slot.last_index {}",
shred_index, last_index
),
String
)
);
return false;
}
if last_in_slot && shred_index < slot_meta.received {
datapoint_error!(
"blocktree_error",
(
"error",
format!(
"Received shred_index {} < slot.received {}",
shred_index, slot_meta.received
),
String
)
);
return false;
}
let last_root = *last_root.read().unwrap();
verify_shred_slots(slot, slot_meta.parent_slot, last_root)
}
fn insert_data_shred(
&self,
slot_meta: &mut SlotMeta,
data_index: &mut DataIndex,
shred: &Shred,
write_batch: &mut WriteBatch,
) -> Result<()> {
let slot = shred.slot();
let index = u64::from(shred.index());
let last_in_slot = if shred.last_in_slot() {
debug!("got last in slot");
true
} else {
false
};
let last_in_data = if shred.data_complete() {
debug!("got last in data");
true
} else {
false
};
assert!(!is_orphan(slot_meta));
let data_cf = self.db.column::<cf::ShredData>();
let check_data_cf = |slot, index| {
data_cf
.get_bytes((slot, index))
.map(|opt| opt.is_some())
.unwrap_or(false)
};
let new_consumed = if slot_meta.consumed == index {
let mut current_index = index + 1;
while data_index.is_present(current_index) || check_data_cf(slot, current_index) {
current_index += 1;
}
current_index
} else {
slot_meta.consumed
};
write_batch.put_bytes::<cf::ShredData>((slot, index), &shred.payload)?;
update_slot_meta(
last_in_slot,
last_in_data,
slot_meta,
index as u32,
new_consumed,
);
data_index.set_present(index, true);
trace!("inserted shred into slot {:?} and index {:?}", slot, index);
Ok(())
}
pub fn get_data_shred(&self, slot: u64, index: u64) -> Result<Option<Vec<u8>>> {
self.data_shred_cf.get_bytes((slot, index))
}
pub fn get_data_shreds(
&self,
slot: u64,
from_index: u64,
to_index: u64,
buffer: &mut [u8],
) -> Result<(u64, usize)> {
let meta_cf = self.db.column::<cf::SlotMeta>();
let mut buffer_offset = 0;
let mut last_index = 0;
if let Some(meta) = meta_cf.get(slot)? {
if !meta.is_full() {
warn!("The slot is not yet full. Will not return any shreds");
return Ok((last_index, buffer_offset));
}
let to_index = cmp::min(to_index, meta.consumed);
for index in from_index..to_index {
if let Some(shred_data) = self.get_data_shred(slot, index)? {
let shred_len = shred_data.len();
if buffer.len().saturating_sub(buffer_offset) >= shred_len {
buffer[buffer_offset..buffer_offset + shred_len]
.copy_from_slice(&shred_data[..shred_len]);
buffer_offset += shred_len;
last_index = index;
if buffer.len().saturating_sub(buffer_offset) < shred_len {
break;
}
} else {
break;
}
}
}
}
Ok((last_index, buffer_offset))
}
pub fn get_coding_shred(&self, slot: u64, index: u64) -> Result<Option<Vec<u8>>> {
self.code_shred_cf.get_bytes((slot, index))
}
pub fn write_entries(
&self,
start_slot: u64,
num_ticks_in_start_slot: u64,
start_index: u32,
ticks_per_slot: u64,
parent: Option<u64>,
is_full_slot: bool,
keypair: &Arc<Keypair>,
entries: Vec<Entry>,
) -> Result<usize> {
assert!(num_ticks_in_start_slot < ticks_per_slot);
let mut remaining_ticks_in_slot = ticks_per_slot - num_ticks_in_start_slot;
let mut current_slot = start_slot;
let mut parent_slot = parent.map_or(
if current_slot == 0 {
current_slot
} else {
current_slot - 1
},
|v| v,
);
let mut shredder = Shredder::new(current_slot, parent_slot, 0.0, keypair.clone())
.expect("Failed to create entry shredder");
let mut all_shreds = vec![];
let mut slot_entries = vec![];
for entry in entries.into_iter() {
if remaining_ticks_in_slot == 0 {
current_slot += 1;
parent_slot = current_slot - 1;
remaining_ticks_in_slot = ticks_per_slot;
let mut current_entries = vec![];
std::mem::swap(&mut slot_entries, &mut current_entries);
let start_index = {
if all_shreds.is_empty() {
start_index
} else {
0
}
};
let (mut data_shreds, mut coding_shreds, _) =
shredder.entries_to_shreds(¤t_entries, true, start_index);
all_shreds.append(&mut data_shreds);
all_shreds.append(&mut coding_shreds);
shredder = Shredder::new(current_slot, parent_slot, 0.0, keypair.clone())
.expect("Failed to create entry shredder");
}
if entry.is_tick() {
remaining_ticks_in_slot -= 1;
}
slot_entries.push(entry);
}
if !slot_entries.is_empty() {
let (mut data_shreds, mut coding_shreds, _) =
shredder.entries_to_shreds(&slot_entries, is_full_slot, 0);
all_shreds.append(&mut data_shreds);
all_shreds.append(&mut coding_shreds);
}
let num_shreds = all_shreds.len();
self.insert_shreds(all_shreds, None)?;
Ok(num_shreds)
}
pub fn get_index(&self, slot: u64) -> Result<Option<Index>> {
self.index_cf.get(slot)
}
pub fn put_meta_bytes(&self, slot: u64, bytes: &[u8]) -> Result<()> {
self.meta_cf.put_bytes(slot, bytes)
}
fn find_missing_indexes<C>(
db_iterator: &mut DBRawIterator,
slot: u64,
start_index: u64,
end_index: u64,
max_missing: usize,
) -> Vec<u64>
where
C: Column<Index = (u64, u64)>,
{
if start_index >= end_index || max_missing == 0 {
return vec![];
}
let mut missing_indexes = vec![];
db_iterator.seek(&C::key((slot, start_index)));
let mut prev_index = start_index;
'outer: loop {
if !db_iterator.valid() {
for i in prev_index..end_index {
missing_indexes.push(i);
if missing_indexes.len() == max_missing {
break;
}
}
break;
}
let (current_slot, index) = C::index(&db_iterator.key().expect("Expect a valid key"));
let current_index = {
if current_slot > slot {
end_index
} else {
index
}
};
let upper_index = cmp::min(current_index, end_index);
for i in prev_index..upper_index {
missing_indexes.push(i);
if missing_indexes.len() == max_missing {
break 'outer;
}
}
if current_slot > slot {
break;
}
if current_index >= end_index {
break;
}
prev_index = current_index + 1;
db_iterator.next();
}
missing_indexes
}
pub fn find_missing_data_indexes(
&self,
slot: u64,
start_index: u64,
end_index: u64,
max_missing: usize,
) -> Vec<u64> {
if let Ok(mut db_iterator) = self
.db
.raw_iterator_cf(self.db.cf_handle::<cf::ShredData>())
{
Self::find_missing_indexes::<cf::ShredData>(
&mut db_iterator,
slot,
start_index,
end_index,
max_missing,
)
} else {
vec![]
}
}
pub fn get_slot_entries(
&self,
slot: u64,
shred_start_index: u64,
_max_entries: Option<u64>,
) -> Result<Vec<Entry>> {
self.get_slot_entries_with_shred_count(slot, shred_start_index)
.map(|x| x.0)
}
pub fn get_slot_entries_with_shred_count(
&self,
slot: u64,
start_index: u64,
) -> Result<(Vec<Entry>, usize)> {
let slot_meta_cf = self.db.column::<cf::SlotMeta>();
let slot_meta = slot_meta_cf.get(slot)?;
if slot_meta.is_none() {
return Ok((vec![], 0));
}
let slot_meta = slot_meta.unwrap();
let completed_ranges = Self::get_completed_data_ranges(
start_index as u32,
&slot_meta.completed_data_indexes[..],
slot_meta.consumed as u32,
);
if completed_ranges.is_empty() {
return Ok((vec![], 0));
}
let num_shreds = completed_ranges
.last()
.map(|(_, end_index)| u64::from(*end_index) - start_index + 1);
let all_entries: Result<Vec<Vec<Entry>>> = PAR_THREAD_POOL.with(|thread_pool| {
thread_pool.borrow().install(|| {
completed_ranges
.par_iter()
.map(|(start_index, end_index)| {
self.get_entries_in_data_block(slot, *start_index, *end_index)
})
.collect()
})
});
let all_entries: Vec<Entry> = all_entries?.into_iter().flatten().collect();
Ok((all_entries, num_shreds.unwrap_or(0) as usize))
}
fn get_completed_data_ranges(
mut start_index: u32,
completed_data_end_indexes: &[u32],
consumed: u32,
) -> Vec<(u32, u32)> {
let mut completed_data_ranges = vec![];
let floor = completed_data_end_indexes
.iter()
.position(|i| *i >= start_index)
.unwrap_or_else(|| completed_data_end_indexes.len());
for i in &completed_data_end_indexes[floor as usize..] {
assert!(*i != consumed);
if *i < consumed {
completed_data_ranges.push((start_index, *i));
start_index = *i + 1;
}
}
completed_data_ranges
}
fn get_entries_in_data_block(
&self,
slot: u64,
start_index: u32,
end_index: u32,
) -> Result<Vec<Entry>> {
let data_shred_cf = self.db.column::<cf::ShredData>();
let data_shreds: Result<Vec<Shred>> = (start_index..=end_index)
.map(|i| {
data_shred_cf
.get_bytes((slot, u64::from(i)))
.and_then(|serialized_shred| {
Shred::new_from_serialized_shred(
serialized_shred
.expect("Shred must exist if shred index was included in a range"),
)
.map_err(|_| {
BlocktreeError::InvalidShredData(Box::new(bincode::ErrorKind::Custom(
"Could not reconstruct shred from shred payload".to_string(),
)))
})
})
})
.collect();
let data_shreds = data_shreds?;
assert!(data_shreds.last().unwrap().data_complete());
let deshred_payload = Shredder::deshred(&data_shreds).map_err(|_| {
BlocktreeError::InvalidShredData(Box::new(bincode::ErrorKind::Custom(
"Could not reconstruct data block from constituent shreds".to_string(),
)))
})?;
debug!("{:?} shreds in last FEC set", data_shreds.len(),);
bincode::deserialize::<Vec<Entry>>(&deshred_payload).map_err(|_| {
BlocktreeError::InvalidShredData(Box::new(bincode::ErrorKind::Custom(
"could not reconstruct entries".to_string(),
)))
})
}
pub fn get_slots_since(&self, slots: &[u64]) -> Result<HashMap<u64, Vec<u64>>> {
let slot_metas: Result<Vec<Option<SlotMeta>>> =
slots.iter().map(|slot| self.meta(*slot)).collect();
let slot_metas = slot_metas?;
let result: HashMap<u64, Vec<u64>> = slots
.iter()
.zip(slot_metas)
.filter_map(|(height, meta)| {
meta.map(|meta| {
let valid_next_slots: Vec<u64> = meta
.next_slots
.iter()
.cloned()
.filter(|s| !self.is_dead(*s))
.collect();
(*height, valid_next_slots)
})
})
.collect();
Ok(result)
}
pub fn is_root(&self, slot: u64) -> bool {
if let Ok(Some(true)) = self.db.get::<cf::Root>(slot) {
true
} else {
false
}
}
pub fn set_roots(&self, rooted_slots: &[u64]) -> Result<()> {
let mut write_batch = self.db.batch()?;
for slot in rooted_slots {
write_batch.put::<cf::Root>(*slot, &true)?;
}
self.db.write(write_batch)?;
let mut last_root = self.last_root.write().unwrap();
if *last_root == std::u64::MAX {
*last_root = 0;
}
*last_root = cmp::max(*rooted_slots.iter().max().unwrap(), *last_root);
Ok(())
}
pub fn is_dead(&self, slot: u64) -> bool {
if let Some(true) = self
.db
.get::<cf::DeadSlots>(slot)
.expect("fetch from DeadSlots column family failed")
{
true
} else {
false
}
}
pub fn set_dead_slot(&self, slot: u64) -> Result<()> {
self.dead_slots_cf.put(slot, &true)
}
pub fn get_orphans(&self, max: Option<usize>) -> Vec<u64> {
let mut results = vec![];
let mut iter = self
.db
.raw_iterator_cf(self.db.cf_handle::<cf::Orphans>())
.unwrap();
iter.seek_to_first();
while iter.valid() {
if let Some(max) = max {
if results.len() > max {
break;
}
}
results.push(<cf::Orphans as Column>::index(&iter.key().unwrap()));
iter.next();
}
results
}
pub fn prune(&self, target_slot: u64) {
let mut meta = self
.meta(target_slot)
.expect("couldn't read slot meta")
.expect("no meta for target slot");
meta.next_slots.clear();
self.put_meta_bytes(
target_slot,
&bincode::serialize(&meta).expect("couldn't get meta bytes"),
)
.expect("unable to update meta for target slot");
self.purge_slots(target_slot + 1, None);
for (slot, mut meta) in self
.slot_meta_iterator(0)
.expect("unable to iterate over meta")
{
if slot > target_slot {
break;
}
meta.next_slots.retain(|slot| *slot <= target_slot);
self.put_meta_bytes(
slot,
&bincode::serialize(&meta).expect("couldn't update meta"),
)
.expect("couldn't update meta");
}
}
pub fn last_root(&self) -> u64 {
*self.last_root.read().unwrap()
}
}
fn update_slot_meta(
is_last_in_slot: bool,
is_last_in_data: bool,
slot_meta: &mut SlotMeta,
index: u32,
new_consumed: u64,
) {
slot_meta.received = cmp::max((u64::from(index) + 1) as u64, slot_meta.received);
slot_meta.consumed = new_consumed;
slot_meta.last_index = {
if slot_meta.last_index == std::u64::MAX {
if is_last_in_slot {
u64::from(index)
} else {
std::u64::MAX
}
} else {
slot_meta.last_index
}
};
if is_last_in_slot || is_last_in_data {
let position = slot_meta
.completed_data_indexes
.iter()
.position(|completed_data_index| *completed_data_index > index)
.unwrap_or_else(|| slot_meta.completed_data_indexes.len());
slot_meta.completed_data_indexes.insert(position, index);
}
}
fn get_index_meta_entry<'a>(
db: &Database,
slot: u64,
index_working_set: &'a mut HashMap<u64, IndexMetaWorkingSetEntry>,
index_meta_time: &mut u64,
) -> &'a mut IndexMetaWorkingSetEntry {
let index_cf = db.column::<cf::Index>();
let mut total_start = Measure::start("Total elapsed");
let res = index_working_set.entry(slot).or_insert_with(|| {
let newly_inserted_meta = index_cf
.get(slot)
.unwrap()
.unwrap_or_else(|| Index::new(slot));
IndexMetaWorkingSetEntry {
index: newly_inserted_meta,
did_insert_occur: false,
}
});
total_start.stop();
*index_meta_time += total_start.as_us();
res
}
fn get_slot_meta_entry<'a>(
db: &Database,
slot_meta_working_set: &'a mut HashMap<u64, SlotMetaWorkingSetEntry>,
slot: u64,
parent_slot: u64,
) -> &'a mut SlotMetaWorkingSetEntry {
let meta_cf = db.column::<cf::SlotMeta>();
slot_meta_working_set.entry(slot).or_insert_with(|| {
if let Some(mut meta) = meta_cf.get(slot).expect("Expect database get to succeed") {
let backup = Some(meta.clone());
if is_orphan(&meta) {
meta.parent_slot = parent_slot;
}
SlotMetaWorkingSetEntry::new(Rc::new(RefCell::new(meta)), backup)
} else {
SlotMetaWorkingSetEntry::new(
Rc::new(RefCell::new(SlotMeta::new(slot, parent_slot))),
None,
)
}
})
}
fn is_valid_write_to_slot_0(slot_to_write: u64, parent_slot: u64, last_root: u64) -> bool {
slot_to_write == 0 && last_root == 0 && parent_slot == 0
}
fn send_signals(
new_shreds_signals: &[SyncSender<bool>],
completed_slots_senders: &[SyncSender<Vec<u64>>],
should_signal: bool,
newly_completed_slots: Vec<u64>,
) -> Result<()> {
if should_signal {
for signal in new_shreds_signals {
let _ = signal.try_send(true);
}
}
if !completed_slots_senders.is_empty() && !newly_completed_slots.is_empty() {
let mut slots: Vec<_> = (0..completed_slots_senders.len() - 1)
.map(|_| newly_completed_slots.clone())
.collect();
slots.push(newly_completed_slots);
for (signal, slots) in completed_slots_senders.iter().zip(slots.into_iter()) {
let res = signal.try_send(slots);
if let Err(TrySendError::Full(_)) = res {
datapoint_error!(
"blocktree_error",
(
"error",
"Unable to send newly completed slot because channel is full".to_string(),
String
),
);
}
}
}
Ok(())
}
fn commit_slot_meta_working_set(
slot_meta_working_set: &HashMap<u64, SlotMetaWorkingSetEntry>,
completed_slots_senders: &[SyncSender<Vec<u64>>],
write_batch: &mut WriteBatch,
) -> Result<(bool, Vec<u64>)> {
let mut should_signal = false;
let mut newly_completed_slots = vec![];
for (slot, slot_meta_entry) in slot_meta_working_set.iter() {
assert!(slot_meta_entry.did_insert_occur);
let meta: &SlotMeta = &RefCell::borrow(&*slot_meta_entry.new_slot_meta);
let meta_backup = &slot_meta_entry.old_slot_meta;
if !completed_slots_senders.is_empty() && is_newly_completed_slot(meta, meta_backup) {
newly_completed_slots.push(*slot);
}
if Some(meta) != meta_backup.as_ref() {
should_signal = should_signal || slot_has_updates(meta, &meta_backup);
write_batch.put::<cf::SlotMeta>(*slot, &meta)?;
}
}
Ok((should_signal, newly_completed_slots))
}
fn find_slot_meta_else_create<'a>(
db: &Database,
working_set: &'a HashMap<u64, SlotMetaWorkingSetEntry>,
chained_slots: &'a mut HashMap<u64, Rc<RefCell<SlotMeta>>>,
slot_index: u64,
) -> Result<Rc<RefCell<SlotMeta>>> {
let result = find_slot_meta_in_cached_state(working_set, chained_slots, slot_index)?;
if let Some(slot) = result {
Ok(slot)
} else {
find_slot_meta_in_db_else_create(db, slot_index, chained_slots)
}
}
fn find_slot_meta_in_db_else_create<'a>(
db: &Database,
slot: u64,
insert_map: &'a mut HashMap<u64, Rc<RefCell<SlotMeta>>>,
) -> Result<Rc<RefCell<SlotMeta>>> {
if let Some(slot_meta) = db.column::<cf::SlotMeta>().get(slot)? {
insert_map.insert(slot, Rc::new(RefCell::new(slot_meta)));
Ok(insert_map.get(&slot).unwrap().clone())
} else {
insert_map.insert(
slot,
Rc::new(RefCell::new(SlotMeta::new(slot, std::u64::MAX))),
);
Ok(insert_map.get(&slot).unwrap().clone())
}
}
fn find_slot_meta_in_cached_state<'a>(
working_set: &'a HashMap<u64, SlotMetaWorkingSetEntry>,
chained_slots: &'a HashMap<u64, Rc<RefCell<SlotMeta>>>,
slot: u64,
) -> Result<Option<Rc<RefCell<SlotMeta>>>> {
if let Some(entry) = working_set.get(&slot) {
Ok(Some(entry.new_slot_meta.clone()))
} else if let Some(entry) = chained_slots.get(&slot) {
Ok(Some(entry.clone()))
} else {
Ok(None)
}
}
fn handle_chaining(
db: &Database,
write_batch: &mut WriteBatch,
working_set: &mut HashMap<u64, SlotMetaWorkingSetEntry>,
) -> Result<()> {
working_set.retain(|_, entry| entry.did_insert_occur);
let mut new_chained_slots = HashMap::new();
let working_set_slots: Vec<_> = working_set.keys().collect();
for slot in working_set_slots {
handle_chaining_for_slot(db, write_batch, working_set, &mut new_chained_slots, *slot)?;
}
for (slot, meta) in new_chained_slots.iter() {
let meta: &SlotMeta = &RefCell::borrow(&*meta);
write_batch.put::<cf::SlotMeta>(*slot, meta)?;
}
Ok(())
}
fn handle_chaining_for_slot(
db: &Database,
write_batch: &mut WriteBatch,
working_set: &HashMap<u64, SlotMetaWorkingSetEntry>,
new_chained_slots: &mut HashMap<u64, Rc<RefCell<SlotMeta>>>,
slot: u64,
) -> Result<()> {
let slot_meta_entry = working_set
.get(&slot)
.expect("Slot must exist in the working_set hashmap");
let meta = &slot_meta_entry.new_slot_meta;
let meta_backup = &slot_meta_entry.old_slot_meta;
{
let mut meta_mut = meta.borrow_mut();
let was_orphan_slot = meta_backup.is_some() && is_orphan(meta_backup.as_ref().unwrap());
if slot != 0 {
let prev_slot = meta_mut.parent_slot;
if meta_backup.is_none() || was_orphan_slot {
let prev_slot_meta =
find_slot_meta_else_create(db, working_set, new_chained_slots, prev_slot)?;
chain_new_slot_to_prev_slot(&mut prev_slot_meta.borrow_mut(), slot, &mut meta_mut);
if is_orphan(&RefCell::borrow(&*prev_slot_meta)) {
write_batch.put::<cf::Orphans>(prev_slot, &true)?;
}
}
}
if was_orphan_slot {
write_batch.delete::<cf::Orphans>(slot)?;
}
}
let should_propagate_is_connected =
is_newly_completed_slot(&RefCell::borrow(&*meta), meta_backup)
&& RefCell::borrow(&*meta).is_connected;
if should_propagate_is_connected {
let slot_function = |slot: &mut SlotMeta| {
slot.is_connected = true;
slot.is_full()
};
traverse_children_mut(
db,
slot,
&meta,
working_set,
new_chained_slots,
slot_function,
)?;
}
Ok(())
}
fn traverse_children_mut<F>(
db: &Database,
slot: u64,
slot_meta: &Rc<RefCell<(SlotMeta)>>,
working_set: &HashMap<u64, SlotMetaWorkingSetEntry>,
new_chained_slots: &mut HashMap<u64, Rc<RefCell<SlotMeta>>>,
slot_function: F,
) -> Result<()>
where
F: Fn(&mut SlotMeta) -> bool,
{
let mut next_slots: Vec<(u64, Rc<RefCell<(SlotMeta)>>)> = vec![(slot, slot_meta.clone())];
while !next_slots.is_empty() {
let (_, current_slot) = next_slots.pop().unwrap();
if slot_function(&mut current_slot.borrow_mut()) {
let current_slot = &RefCell::borrow(&*current_slot);
for next_slot_index in current_slot.next_slots.iter() {
let next_slot = find_slot_meta_else_create(
db,
working_set,
new_chained_slots,
*next_slot_index,
)?;
next_slots.push((*next_slot_index, next_slot));
}
}
}
Ok(())
}
fn is_orphan(meta: &SlotMeta) -> bool {
!meta.is_parent_set()
}
fn chain_new_slot_to_prev_slot(
prev_slot_meta: &mut SlotMeta,
current_slot: u64,
current_slot_meta: &mut SlotMeta,
) {
prev_slot_meta.next_slots.push(current_slot);
current_slot_meta.is_connected = prev_slot_meta.is_connected && prev_slot_meta.is_full();
}
fn is_newly_completed_slot(slot_meta: &SlotMeta, backup_slot_meta: &Option<SlotMeta>) -> bool {
slot_meta.is_full()
&& (backup_slot_meta.is_none()
|| slot_meta.consumed != backup_slot_meta.as_ref().unwrap().consumed)
}
fn slot_has_updates(slot_meta: &SlotMeta, slot_meta_backup: &Option<SlotMeta>) -> bool {
slot_meta.is_connected &&
((slot_meta_backup.is_none() && slot_meta.consumed != 0) ||
(slot_meta_backup.is_some() && slot_meta_backup.as_ref().unwrap().consumed != slot_meta.consumed))
}
pub fn create_new_ledger(ledger_path: &Path, genesis_block: &GenesisBlock) -> Result<Hash> {
let ticks_per_slot = genesis_block.ticks_per_slot;
Blocktree::destroy(ledger_path)?;
genesis_block.write(&ledger_path)?;
let blocktree = Blocktree::open(ledger_path)?;
let entries = create_ticks(ticks_per_slot, genesis_block.hash());
let last_hash = entries.last().unwrap().hash;
let shredder = Shredder::new(0, 0, 0.0, Arc::new(Keypair::new()))
.expect("Failed to create entry shredder");
let shreds = shredder.entries_to_shreds(&entries, true, 0).0;
assert!(shreds.last().unwrap().last_in_slot());
blocktree.insert_shreds(shreds, None)?;
blocktree.set_roots(&[0])?;
Ok(last_hash)
}
#[macro_export]
macro_rules! tmp_ledger_name {
() => {
&format!("{}-{}", file!(), line!())
};
}
#[macro_export]
macro_rules! get_tmp_ledger_path {
() => {
get_tmp_ledger_path(tmp_ledger_name!())
};
}
pub fn get_tmp_ledger_path(name: &str) -> PathBuf {
use std::env;
let out_dir = env::var("FARF_DIR").unwrap_or_else(|_| "farf".to_string());
let keypair = Keypair::new();
let path = [
out_dir,
"ledger".to_string(),
format!("{}-{}", name, keypair.pubkey()),
]
.iter()
.collect();
let _ignored = fs::remove_dir_all(&path);
path
}
#[macro_export]
macro_rules! create_new_tmp_ledger {
($genesis_block:expr) => {
create_new_tmp_ledger(tmp_ledger_name!(), $genesis_block)
};
}
pub fn verify_shred_slots(slot: u64, parent_slot: u64, last_root: u64) -> bool {
if !is_valid_write_to_slot_0(slot, parent_slot, last_root) {
if parent_slot >= slot {
return false;
}
if parent_slot < last_root {
return false;
}
}
true
}
pub fn create_new_tmp_ledger(name: &str, genesis_block: &GenesisBlock) -> (PathBuf, Hash) {
let ledger_path = get_tmp_ledger_path(name);
let blockhash = create_new_ledger(&ledger_path, genesis_block).unwrap();
(ledger_path, blockhash)
}
pub fn entries_to_test_shreds(
entries: Vec<Entry>,
slot: u64,
parent_slot: u64,
is_full_slot: bool,
) -> Vec<Shred> {
let shredder = Shredder::new(slot, parent_slot, 0.0, Arc::new(Keypair::new()))
.expect("Failed to create entry shredder");
shredder.entries_to_shreds(&entries, is_full_slot, 0).0
}
pub fn make_slot_entries(
slot: u64,
parent_slot: u64,
num_entries: u64,
) -> (Vec<Shred>, Vec<Entry>) {
let entries = create_ticks(num_entries, Hash::default());
let shreds = entries_to_test_shreds(entries.clone(), slot, parent_slot, true);
(shreds, entries)
}
pub fn make_many_slot_entries(
start_slot: u64,
num_slots: u64,
entries_per_slot: u64,
) -> (Vec<Shred>, Vec<Entry>) {
let mut shreds = vec![];
let mut entries = vec![];
for slot in start_slot..start_slot + num_slots {
let parent_slot = if slot == 0 { 0 } else { slot - 1 };
let (slot_shreds, slot_entries) = make_slot_entries(slot, parent_slot, entries_per_slot);
shreds.extend(slot_shreds);
entries.extend(slot_entries);
}
(shreds, entries)
}
pub fn make_chaining_slot_entries(
chain: &[u64],
entries_per_slot: u64,
) -> Vec<(Vec<Shred>, Vec<Entry>)> {
let mut slots_shreds_and_entries = vec![];
for (i, slot) in chain.iter().enumerate() {
let parent_slot = {
if *slot == 0 || i == 0 {
0
} else {
chain[i - 1]
}
};
let result = make_slot_entries(*slot, parent_slot, entries_per_slot);
slots_shreds_and_entries.push(result);
}
slots_shreds_and_entries
}
#[cfg(test)]
pub mod tests {
use super::*;
use crate::genesis_utils::{create_genesis_block, GenesisBlockInfo};
use crate::shred::{max_ticks_per_n_shreds, DataShredHeader};
use itertools::Itertools;
use rand::seq::SliceRandom;
use rand::thread_rng;
use solana_sdk::hash::Hash;
use solana_sdk::packet::PACKET_DATA_SIZE;
use std::iter::FromIterator;
use std::time::Duration;
#[test]
fn test_create_new_ledger() {
let mint_total = 1_000_000_000_000;
let GenesisBlockInfo { genesis_block, .. } = create_genesis_block(mint_total);
let (ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_block);
let ledger = Blocktree::open(&ledger_path).unwrap();
let ticks = create_ticks(genesis_block.ticks_per_slot, genesis_block.hash());
let entries = ledger.get_slot_entries(0, 0, None).unwrap();
assert_eq!(ticks, entries);
drop(ledger);
Blocktree::destroy(&ledger_path).expect("Expected successful database destruction");
}
#[test]
fn test_insert_get_bytes() {
let num_entries = max_ticks_per_n_shreds(1) + 1;
assert!(num_entries > 1);
let (mut shreds, _) = make_slot_entries(0, 0, num_entries);
let ledger_path = get_tmp_ledger_path("test_insert_data_shreds_basic");
let ledger = Blocktree::open(&ledger_path).unwrap();
let last_shred = shreds.pop().unwrap();
assert!(last_shred.index() > 0);
ledger
.insert_shreds(vec![last_shred.clone()], None)
.unwrap();
let serialized_shred = ledger
.data_shred_cf
.get_bytes((0, last_shred.index() as u64))
.unwrap()
.unwrap();
let deserialized_shred = Shred::new_from_serialized_shred(serialized_shred).unwrap();
assert_eq!(last_shred, deserialized_shred);
drop(ledger);
Blocktree::destroy(&ledger_path).expect("Expected successful database destruction");
}
#[test]
fn test_write_entries() {
solana_logger::setup();
let ledger_path = get_tmp_ledger_path!();
{
let ticks_per_slot = 10;
let num_slots = 10;
let ledger = Blocktree::open(&ledger_path).unwrap();
let mut ticks = vec![];
let mut shreds_per_slot = vec![];
for i in 0..num_slots {
let mut new_ticks = create_ticks(ticks_per_slot, Hash::default());
let num_shreds = ledger
.write_entries(
i,
0,
0,
ticks_per_slot,
Some(i.saturating_sub(1)),
true,
&Arc::new(Keypair::new()),
new_ticks.clone(),
)
.unwrap() as u64;
shreds_per_slot.push(num_shreds);
ticks.append(&mut new_ticks);
}
for i in 0..num_slots {
let meta = ledger.meta(i).unwrap().unwrap();
let num_shreds = shreds_per_slot[i as usize];
assert_eq!(meta.consumed, num_shreds);
assert_eq!(meta.received, num_shreds);
assert_eq!(meta.last_index, num_shreds - 1);
if i == num_slots - 1 {
assert!(meta.next_slots.is_empty());
} else {
assert_eq!(meta.next_slots, vec![i + 1]);
}
if i == 0 {
assert_eq!(meta.parent_slot, 0);
} else {
assert_eq!(meta.parent_slot, i - 1);
}
assert_eq!(
&ticks[(i * ticks_per_slot) as usize..((i + 1) * ticks_per_slot) as usize],
&ledger.get_slot_entries(i, 0, None).unwrap()[..]
);
}
}
Blocktree::destroy(&ledger_path).expect("Expected successful database destruction");
}
#[test]
fn test_put_get_simple() {
let ledger_path = get_tmp_ledger_path("test_put_get_simple");
let ledger = Blocktree::open(&ledger_path).unwrap();
let meta = SlotMeta::new(0, 1);
ledger.meta_cf.put(0, &meta).unwrap();
let result = ledger
.meta_cf
.get(0)
.unwrap()
.expect("Expected meta object to exist");
assert_eq!(result, meta);
let erasure = vec![1u8; 16];
let erasure_key = (0, 0);
ledger
.code_shred_cf
.put_bytes(erasure_key, &erasure)
.unwrap();
let result = ledger
.code_shred_cf
.get_bytes(erasure_key)
.unwrap()
.expect("Expected erasure object to exist");
assert_eq!(result, erasure);
let data = vec![2u8; 16];
let data_key = (0, 0);
ledger.data_shred_cf.put_bytes(data_key, &data).unwrap();
let result = ledger
.data_shred_cf
.get_bytes(data_key)
.unwrap()
.expect("Expected data object to exist");
assert_eq!(result, data);
drop(ledger);
Blocktree::destroy(&ledger_path).expect("Expected successful database destruction");
}
#[test]
fn test_read_shred_bytes() {
let slot = 0;
let (shreds, _) = make_slot_entries(slot, 0, 100);
let num_shreds = shreds.len() as u64;
let shred_bufs: Vec<_> = shreds.iter().map(|shred| shred.payload.clone()).collect();
let ledger_path = get_tmp_ledger_path("test_read_shreds_bytes");
let ledger = Blocktree::open(&ledger_path).unwrap();
ledger.insert_shreds(shreds, None).unwrap();
let mut buf = [0; 4096];
let (_, bytes) = ledger.get_data_shreds(slot, 0, 1, &mut buf).unwrap();
assert_eq!(buf[..bytes], shred_bufs[0][..bytes]);
let (last_index, bytes2) = ledger.get_data_shreds(slot, 0, 2, &mut buf).unwrap();
assert_eq!(last_index, 1);
assert!(bytes2 > bytes);
{
let shred_data_1 = &buf[..bytes];
assert_eq!(shred_data_1, &shred_bufs[0][..bytes]);
let shred_data_2 = &buf[bytes..bytes2];
assert_eq!(shred_data_2, &shred_bufs[1][..bytes2 - bytes]);
}
let mut buf = vec![0; bytes + 1];
let (last_index, bytes3) = ledger.get_data_shreds(slot, 0, 2, &mut buf).unwrap();
assert_eq!(last_index, 0);
assert_eq!(bytes3, bytes);
let mut buf = vec![0; bytes2 - 1];
let (last_index, bytes4) = ledger.get_data_shreds(slot, 0, 2, &mut buf).unwrap();
assert_eq!(last_index, 0);
assert_eq!(bytes4, bytes);
let mut buf = vec![0; bytes * 2];
let (last_index, bytes6) = ledger
.get_data_shreds(slot, num_shreds - 1, num_shreds, &mut buf)
.unwrap();
assert_eq!(last_index, num_shreds - 1);
{
let shred_data = &buf[..bytes6];
assert_eq!(shred_data, &shred_bufs[(num_shreds - 1) as usize][..bytes6]);
}
let (last_index, bytes6) = ledger
.get_data_shreds(slot, num_shreds, num_shreds + 2, &mut buf)
.unwrap();
assert_eq!(last_index, 0);
assert_eq!(bytes6, 0);
drop(ledger);
Blocktree::destroy(&ledger_path).expect("Expected successful database destruction");
}
#[test]
fn test_insert_data_shreds_basic() {
let num_entries = max_ticks_per_n_shreds(1) + 1;
assert!(num_entries > 1);
let (mut shreds, entries) = make_slot_entries(0, 0, num_entries);
let num_shreds = shreds.len() as u64;
let ledger_path = get_tmp_ledger_path("test_insert_data_shreds_basic");
let ledger = Blocktree::open(&ledger_path).unwrap();
assert!(shreds.len() > 1);
let last_shred = shreds.pop().unwrap();
ledger.insert_shreds(vec![last_shred], None).unwrap();
assert!(ledger.get_slot_entries(0, 0, None).unwrap().is_empty());
let meta = ledger
.meta(0)
.unwrap()
.expect("Expected new metadata object to be created");
assert!(meta.consumed == 0 && meta.received == num_shreds);
ledger.insert_shreds(shreds, None).unwrap();
let result = ledger.get_slot_entries(0, 0, None).unwrap();
assert_eq!(result, entries);
let meta = ledger
.meta(0)
.unwrap()
.expect("Expected new metadata object to exist");
assert_eq!(meta.consumed, num_shreds);
assert_eq!(meta.received, num_shreds);
assert_eq!(meta.parent_slot, 0);
assert_eq!(meta.last_index, num_shreds - 1);
assert!(meta.next_slots.is_empty());
assert!(meta.is_connected);
drop(ledger);
Blocktree::destroy(&ledger_path).expect("Expected successful database destruction");
}
#[test]
fn test_insert_data_shreds_reverse() {
let num_shreds = 10;
let num_entries = max_ticks_per_n_shreds(num_shreds);
let (mut shreds, entries) = make_slot_entries(0, 0, num_entries);
let num_shreds = shreds.len() as u64;
let ledger_path = get_tmp_ledger_path("test_insert_data_shreds_reverse");
let ledger = Blocktree::open(&ledger_path).unwrap();
for i in (0..num_shreds).rev() {
let shred = shreds.pop().unwrap();
ledger.insert_shreds(vec![shred], None).unwrap();
let result = ledger.get_slot_entries(0, 0, None).unwrap();
let meta = ledger
.meta(0)
.unwrap()
.expect("Expected metadata object to exist");
assert_eq!(meta.last_index, num_shreds - 1);
if i != 0 {
assert_eq!(result.len(), 0);
assert!(meta.consumed == 0 && meta.received == num_shreds as u64);
} else {
assert_eq!(meta.parent_slot, 0);
assert_eq!(result, entries);
assert!(meta.consumed == num_shreds as u64 && meta.received == num_shreds as u64);
}
}
drop(ledger);
Blocktree::destroy(&ledger_path).expect("Expected successful database destruction");
}
#[test]
fn test_insert_slots() {
test_insert_data_shreds_slots("test_insert_data_shreds_slots_single", false);
test_insert_data_shreds_slots("test_insert_data_shreds_slots_bulk", true);
}
#[test]
pub fn test_get_slot_entries1() {
let blocktree_path = get_tmp_ledger_path("test_get_slot_entries1");
{
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let entries = create_ticks(8, Hash::default());
let shreds = entries_to_test_shreds(entries[0..4].to_vec(), 1, 0, false);
blocktree
.insert_shreds(shreds, None)
.expect("Expected successful write of shreds");
let mut shreds1 = entries_to_test_shreds(entries[4..].to_vec(), 1, 0, false);
for (i, b) in shreds1.iter_mut().enumerate() {
b.set_index(8 + i as u32);
}
blocktree
.insert_shreds(shreds1, None)
.expect("Expected successful write of shreds");
assert_eq!(
blocktree.get_slot_entries(1, 0, None).unwrap()[2..4],
entries[2..4],
);
}
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
#[ignore]
pub fn test_get_slot_entries2() {
let blocktree_path = get_tmp_ledger_path("test_get_slot_entries2");
{
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let num_slots = 5 as u64;
let mut index = 0;
for slot in 0..num_slots {
let entries = create_ticks(slot + 1, Hash::default());
let last_entry = entries.last().unwrap().clone();
let mut shreds =
entries_to_test_shreds(entries, slot, slot.saturating_sub(1), false);
for b in shreds.iter_mut() {
b.set_index(index);
b.set_slot(slot as u64);
index += 1;
}
blocktree
.insert_shreds(shreds, None)
.expect("Expected successful write of shreds");
assert_eq!(
blocktree
.get_slot_entries(slot, u64::from(index - 1), None)
.unwrap(),
vec![last_entry],
);
}
}
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_get_slot_entries3() {
let blocktree_path = get_tmp_ledger_path("test_get_slot_entries3");
{
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let num_slots = 5 as u64;
let shreds_per_slot = 5 as u64;
let entry_serialized_size =
bincode::serialized_size(&create_ticks(1, Hash::default())).unwrap();
let entries_per_slot =
(shreds_per_slot * PACKET_DATA_SIZE as u64) / entry_serialized_size;
for slot in 0..num_slots {
let entries = create_ticks(entries_per_slot, Hash::default());
let shreds =
entries_to_test_shreds(entries.clone(), slot, slot.saturating_sub(1), false);
assert!(shreds.len() as u64 >= shreds_per_slot);
blocktree
.insert_shreds(shreds, None)
.expect("Expected successful write of shreds");
assert_eq!(blocktree.get_slot_entries(slot, 0, None).unwrap(), entries);
}
}
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_insert_data_shreds_consecutive() {
let blocktree_path = get_tmp_ledger_path("test_insert_data_shreds_consecutive");
{
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let min_entries = max_ticks_per_n_shreds(1) + 1;
for i in 0..4 {
let slot = i;
let parent_slot = if i == 0 { 0 } else { i - 1 };
let num_entries = min_entries * (i + 1);
let (shreds, original_entries) = make_slot_entries(slot, parent_slot, num_entries);
let num_shreds = shreds.len() as u64;
assert!(num_shreds > 1);
let mut even_shreds = vec![];
let mut odd_shreds = vec![];
for (i, shred) in shreds.into_iter().enumerate() {
if i % 2 == 0 {
even_shreds.push(shred);
} else {
odd_shreds.push(shred);
}
}
blocktree.insert_shreds(odd_shreds, None).unwrap();
assert_eq!(blocktree.get_slot_entries(slot, 0, None).unwrap(), vec![]);
let meta = blocktree.meta(slot).unwrap().unwrap();
if num_shreds % 2 == 0 {
assert_eq!(meta.received, num_shreds);
} else {
trace!("got here");
assert_eq!(meta.received, num_shreds - 1);
}
assert_eq!(meta.consumed, 0);
if num_shreds % 2 == 0 {
assert_eq!(meta.last_index, num_shreds - 1);
} else {
assert_eq!(meta.last_index, std::u64::MAX);
}
blocktree.insert_shreds(even_shreds, None).unwrap();
assert_eq!(
blocktree.get_slot_entries(slot, 0, None).unwrap(),
original_entries,
);
let meta = blocktree.meta(slot).unwrap().unwrap();
assert_eq!(meta.received, num_shreds);
assert_eq!(meta.consumed, num_shreds);
assert_eq!(meta.parent_slot, parent_slot);
assert_eq!(meta.last_index, num_shreds - 1);
}
}
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_insert_data_shreds_duplicate() {
let blocktree_path = get_tmp_ledger_path("test_insert_data_shreds_duplicate");
{
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let num_unique_entries = 10;
let (mut original_shreds, original_entries) =
make_slot_entries(0, 0, num_unique_entries);
original_shreds.remove(0);
blocktree.insert_shreds(original_shreds, None).unwrap();
assert_eq!(blocktree.get_slot_entries(0, 0, None).unwrap(), vec![]);
let duplicate_shreds = entries_to_test_shreds(original_entries.clone(), 0, 0, true);
let num_shreds = duplicate_shreds.len() as u64;
blocktree.insert_shreds(duplicate_shreds, None).unwrap();
assert_eq!(
blocktree.get_slot_entries(0, 0, None).unwrap(),
original_entries
);
let meta = blocktree.meta(0).unwrap().unwrap();
assert_eq!(meta.consumed, num_shreds);
assert_eq!(meta.received, num_shreds);
assert_eq!(meta.parent_slot, 0);
assert_eq!(meta.last_index, num_shreds - 1);
}
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_new_shreds_signal() {
let ledger_path = get_tmp_ledger_path("test_new_shreds_signal");
let (ledger, recvr, _) = Blocktree::open_with_signal(&ledger_path).unwrap();
let ledger = Arc::new(ledger);
let entries_per_slot = 50;
let (mut shreds, _) = make_slot_entries(0, 0, entries_per_slot);
let shreds_per_slot = shreds.len() as u64;
ledger.insert_shreds(vec![shreds.remove(1)], None).unwrap();
let timer = Duration::new(1, 0);
assert!(recvr.recv_timeout(timer).is_err());
ledger.insert_shreds(vec![shreds.remove(0)], None).unwrap();
assert!(recvr.recv_timeout(timer).is_ok());
assert!(recvr.try_recv().is_err());
ledger.insert_shreds(shreds, None).unwrap();
assert!(recvr.recv_timeout(timer).is_ok());
assert!(recvr.try_recv().is_err());
let num_slots = shreds_per_slot;
let mut shreds = vec![];
let mut missing_shreds = vec![];
for slot in 1..num_slots + 1 {
let (mut slot_shreds, _) = make_slot_entries(slot, slot - 1, entries_per_slot);
let missing_shred = slot_shreds.remove(slot as usize - 1);
shreds.extend(slot_shreds);
missing_shreds.push(missing_shred);
}
ledger.insert_shreds(shreds, None).unwrap();
assert!(recvr.recv_timeout(timer).is_err());
let shreds: Vec<_> = (1..num_slots + 1)
.flat_map(|slot| {
let (mut shred, _) = make_slot_entries(slot, slot - 1, 1);
shred[0].set_index(2 * num_slots as u32);
shred
})
.collect();
ledger.insert_shreds(shreds, None).unwrap();
assert!(recvr.recv_timeout(timer).is_err());
let missing_shreds2 = missing_shreds
.drain((num_slots / 2) as usize..)
.collect_vec();
ledger.insert_shreds(missing_shreds, None).unwrap();
assert!(recvr.recv_timeout(timer).is_ok());
assert!(recvr.try_recv().is_err());
ledger.insert_shreds(missing_shreds2, None).unwrap();
drop(ledger);
Blocktree::destroy(&ledger_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_completed_shreds_signal() {
let ledger_path = get_tmp_ledger_path("test_completed_shreds_signal");
let (ledger, _, recvr) = Blocktree::open_with_signal(&ledger_path).unwrap();
let ledger = Arc::new(ledger);
let entries_per_slot = 10;
let (mut shreds, _) = make_slot_entries(0, 0, entries_per_slot);
let shred0 = shreds.remove(0);
ledger.insert_shreds(shreds, None).unwrap();
assert!(recvr.try_recv().is_err());
ledger.insert_shreds(vec![shred0], None).unwrap();
assert_eq!(recvr.try_recv().unwrap(), vec![0]);
}
#[test]
pub fn test_completed_shreds_signal_orphans() {
let ledger_path = get_tmp_ledger_path("test_completed_shreds_signal_orphans");
let (ledger, _, recvr) = Blocktree::open_with_signal(&ledger_path).unwrap();
let ledger = Arc::new(ledger);
let entries_per_slot = 10;
let slots = vec![2, 5, 10];
let mut all_shreds = make_chaining_slot_entries(&slots[..], entries_per_slot);
let (mut orphan_child, _) = all_shreds.remove(2);
let (mut orphan_shreds, _) = all_shreds.remove(1);
let orphan_child0 = orphan_child.remove(0);
ledger.insert_shreds(orphan_child, None).unwrap();
assert!(recvr.try_recv().is_err());
ledger.insert_shreds(vec![orphan_child0], None).unwrap();
assert_eq!(recvr.try_recv().unwrap(), vec![slots[2]]);
let orphan_shred0 = orphan_shreds.remove(0);
ledger.insert_shreds(orphan_shreds, None).unwrap();
assert!(recvr.try_recv().is_err());
ledger.insert_shreds(vec![orphan_shred0], None).unwrap();
assert_eq!(recvr.try_recv().unwrap(), vec![slots[1]]);
}
#[test]
pub fn test_completed_shreds_signal_many() {
let ledger_path = get_tmp_ledger_path("test_completed_shreds_signal_many");
let (ledger, _, recvr) = Blocktree::open_with_signal(&ledger_path).unwrap();
let ledger = Arc::new(ledger);
let entries_per_slot = 10;
let mut slots = vec![2, 5, 10];
let mut all_shreds = make_chaining_slot_entries(&slots[..], entries_per_slot);
let disconnected_slot = 4;
let (shreds0, _) = all_shreds.remove(0);
let (shreds1, _) = all_shreds.remove(0);
let (shreds2, _) = all_shreds.remove(0);
let (shreds3, _) = make_slot_entries(disconnected_slot, 1, entries_per_slot);
let mut all_shreds: Vec<_> = vec![shreds0, shreds1, shreds2, shreds3]
.into_iter()
.flatten()
.collect();
all_shreds.shuffle(&mut thread_rng());
ledger.insert_shreds(all_shreds, None).unwrap();
let mut result = recvr.try_recv().unwrap();
result.sort();
slots.push(disconnected_slot);
slots.sort();
assert_eq!(result, slots);
}
#[test]
pub fn test_handle_chaining_basic() {
let blocktree_path = get_tmp_ledger_path("test_handle_chaining_basic");
{
let entries_per_slot = 5;
let num_slots = 3;
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let (mut shreds, _) = make_many_slot_entries(0, num_slots, entries_per_slot);
let shreds_per_slot = shreds.len() / num_slots as usize;
let shreds1 = shreds
.drain(shreds_per_slot..2 * shreds_per_slot)
.collect_vec();
blocktree.insert_shreds(shreds1, None).unwrap();
let s1 = blocktree.meta(1).unwrap().unwrap();
assert!(s1.next_slots.is_empty());
assert!(!s1.is_connected);
assert_eq!(s1.parent_slot, 0);
assert_eq!(s1.last_index, shreds_per_slot as u64 - 1);
let shreds2 = shreds
.drain(shreds_per_slot..2 * shreds_per_slot)
.collect_vec();
blocktree.insert_shreds(shreds2, None).unwrap();
let s2 = blocktree.meta(2).unwrap().unwrap();
assert!(s2.next_slots.is_empty());
assert!(!s2.is_connected);
assert_eq!(s2.parent_slot, 1);
assert_eq!(s2.last_index, shreds_per_slot as u64 - 1);
let s1 = blocktree.meta(1).unwrap().unwrap();
assert_eq!(s1.next_slots, vec![2]);
assert!(!s1.is_connected);
assert_eq!(s1.parent_slot, 0);
assert_eq!(s1.last_index, shreds_per_slot as u64 - 1);
blocktree.insert_shreds(shreds, None).unwrap();
for i in 0..3 {
let s = blocktree.meta(i).unwrap().unwrap();
if i != 2 {
assert_eq!(s.next_slots, vec![i + 1]);
}
if i == 0 {
assert_eq!(s.parent_slot, 0);
} else {
assert_eq!(s.parent_slot, i - 1);
}
assert_eq!(s.last_index, shreds_per_slot as u64 - 1);
assert!(s.is_connected);
}
}
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_handle_chaining_missing_slots() {
let blocktree_path = get_tmp_ledger_path("test_handle_chaining_missing_slots");
{
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let num_slots = 30;
let entries_per_slot = 5;
let mut slots = vec![];
let mut missing_slots = vec![];
let mut shreds_per_slot = 2;
for slot in 0..num_slots {
let parent_slot = {
if slot == 0 {
0
} else {
slot - 1
}
};
let (slot_shreds, _) = make_slot_entries(slot, parent_slot, entries_per_slot);
shreds_per_slot = slot_shreds.len();
if slot % 2 == 1 {
slots.extend(slot_shreds);
} else {
missing_slots.extend(slot_shreds);
}
}
blocktree.insert_shreds(slots, None).unwrap();
for i in 0..num_slots {
let s = blocktree.meta(i as u64).unwrap().unwrap();
if i % 2 == 0 {
assert_eq!(s.next_slots, vec![i as u64 + 1]);
assert_eq!(s.parent_slot, std::u64::MAX);
} else {
assert!(s.next_slots.is_empty());
assert_eq!(s.parent_slot, i - 1);
}
if i == 0 {
assert!(s.is_connected);
} else {
assert!(!s.is_connected);
}
}
blocktree.insert_shreds(missing_slots, None).unwrap();
for i in 0..num_slots {
let s = blocktree.meta(i as u64).unwrap().unwrap();
if i != num_slots - 1 {
assert_eq!(s.next_slots, vec![i as u64 + 1]);
} else {
assert!(s.next_slots.is_empty());
}
if i == 0 {
assert_eq!(s.parent_slot, 0);
} else {
assert_eq!(s.parent_slot, i - 1);
}
assert_eq!(s.last_index, shreds_per_slot as u64 - 1);
assert!(s.is_connected);
}
}
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_forward_chaining_is_connected() {
let blocktree_path = get_tmp_ledger_path("test_forward_chaining_is_connected");
{
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let num_slots = 15;
let entries_per_slot = max_ticks_per_n_shreds(1) + 1;
assert!(entries_per_slot > 1);
let (mut shreds, _) = make_many_slot_entries(0, num_slots, entries_per_slot);
let shreds_per_slot = shreds.len() / num_slots as usize;
assert!(shreds_per_slot > 1);
let mut missing_shreds = vec![];
for slot in 0..num_slots {
let mut shreds_for_slot = shreds.drain(..shreds_per_slot).collect_vec();
if slot % 3 == 0 {
let shred0 = shreds_for_slot.remove(0);
missing_shreds.push(shred0);
blocktree.insert_shreds(shreds_for_slot, None).unwrap();
} else {
blocktree.insert_shreds(shreds_for_slot, None).unwrap();
}
}
for i in 0..num_slots {
let s = blocktree.meta(i as u64).unwrap().unwrap();
if i as u64 != num_slots - 1 {
assert_eq!(s.next_slots, vec![i as u64 + 1]);
} else {
assert!(s.next_slots.is_empty());
}
if i == 0 {
assert_eq!(s.parent_slot, 0);
} else {
assert_eq!(s.parent_slot, i - 1);
}
assert_eq!(s.last_index, shreds_per_slot as u64 - 1);
if i != 0 {
assert!(!s.is_connected);
} else {
assert!(s.is_connected);
}
}
for slot_index in 0..num_slots {
if slot_index % 3 == 0 {
let shred = missing_shreds.remove(0);
blocktree.insert_shreds(vec![shred], None).unwrap();
for i in 0..num_slots {
let s = blocktree.meta(i as u64).unwrap().unwrap();
if i != num_slots - 1 {
assert_eq!(s.next_slots, vec![i as u64 + 1]);
} else {
assert!(s.next_slots.is_empty());
}
if i <= slot_index as u64 + 3 {
assert!(s.is_connected);
} else {
assert!(!s.is_connected);
}
if i == 0 {
assert_eq!(s.parent_slot, 0);
} else {
assert_eq!(s.parent_slot, i - 1);
}
assert_eq!(s.last_index, shreds_per_slot as u64 - 1);
}
}
}
}
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_get_slots_since() {
let blocktree_path = get_tmp_ledger_path("test_get_slots_since");
{
let blocktree = Blocktree::open(&blocktree_path).unwrap();
assert!(blocktree.get_slots_since(&vec![0]).unwrap().is_empty());
let mut meta0 = SlotMeta::new(0, 0);
blocktree.meta_cf.put(0, &meta0).unwrap();
let expected: HashMap<u64, Vec<u64>> =
HashMap::from_iter(vec![(0, vec![])].into_iter());
assert_eq!(blocktree.get_slots_since(&vec![0]).unwrap(), expected);
meta0.next_slots = vec![1, 2];
blocktree.meta_cf.put(0, &meta0).unwrap();
let expected: HashMap<u64, Vec<u64>> =
HashMap::from_iter(vec![(0, vec![1, 2])].into_iter());
assert_eq!(blocktree.get_slots_since(&vec![0]).unwrap(), expected);
assert_eq!(blocktree.get_slots_since(&vec![0, 1]).unwrap(), expected);
let mut meta3 = SlotMeta::new(3, 1);
meta3.next_slots = vec![10, 5];
blocktree.meta_cf.put(3, &meta3).unwrap();
let expected: HashMap<u64, Vec<u64>> =
HashMap::from_iter(vec![(0, vec![1, 2]), (3, vec![10, 5])].into_iter());
assert_eq!(blocktree.get_slots_since(&vec![0, 1, 3]).unwrap(), expected);
}
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
fn test_orphans() {
let blocktree_path = get_tmp_ledger_path("test_orphans");
{
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let entries_per_slot = 1;
let (mut shreds, _) = make_many_slot_entries(0, 3, entries_per_slot);
let shreds_per_slot = shreds.len() / 3;
let shreds_for_slot = shreds.drain((shreds_per_slot * 2)..).collect_vec();
blocktree.insert_shreds(shreds_for_slot, None).unwrap();
let meta = blocktree
.meta(1)
.expect("Expect database get to succeed")
.unwrap();
assert!(is_orphan(&meta));
assert_eq!(blocktree.get_orphans(None), vec![1]);
let shreds_for_slot = shreds.drain(shreds_per_slot..).collect_vec();
blocktree.insert_shreds(shreds_for_slot, None).unwrap();
let meta = blocktree
.meta(1)
.expect("Expect database get to succeed")
.unwrap();
assert!(!is_orphan(&meta));
let meta = blocktree
.meta(0)
.expect("Expect database get to succeed")
.unwrap();
assert!(is_orphan(&meta));
assert_eq!(blocktree.get_orphans(None), vec![0]);
let (shred4, _) = make_slot_entries(4, 0, 1);
let (shred5, _) = make_slot_entries(5, 1, 1);
blocktree.insert_shreds(shred4, None).unwrap();
blocktree.insert_shreds(shred5, None).unwrap();
assert_eq!(blocktree.get_orphans(None), vec![0]);
blocktree.insert_shreds(shreds, None).unwrap();
for i in 0..3 {
let meta = blocktree
.meta(i)
.expect("Expect database get to succeed")
.unwrap();
assert!(!is_orphan(&meta));
}
assert!(blocktree.orphans_cf.is_empty().unwrap())
}
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
fn test_insert_data_shreds_slots(name: &str, should_bulk_write: bool) {
let blocktree_path = get_tmp_ledger_path(name);
{
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let num_entries = 20 as u64;
let mut entries = vec![];
let mut shreds = vec![];
let mut num_shreds_per_slot = 0;
for slot in 0..num_entries {
let parent_slot = {
if slot == 0 {
0
} else {
slot - 1
}
};
let (mut shred, entry) = make_slot_entries(slot, parent_slot, 1);
num_shreds_per_slot = shred.len() as u64;
shred
.iter_mut()
.enumerate()
.for_each(|(_, shred)| shred.set_index(0));
shreds.extend(shred);
entries.extend(entry);
}
let num_shreds = shreds.len();
if should_bulk_write {
blocktree.insert_shreds(shreds, None).unwrap();
} else {
for _ in 0..num_shreds {
let shred = shreds.remove(0);
blocktree.insert_shreds(vec![shred], None).unwrap();
}
}
for i in 0..num_entries - 1 {
assert_eq!(
blocktree.get_slot_entries(i, 0, None).unwrap()[0],
entries[i as usize]
);
let meta = blocktree.meta(i).unwrap().unwrap();
assert_eq!(meta.received, 1);
assert_eq!(meta.last_index, 0);
if i != 0 {
assert_eq!(meta.parent_slot, i - 1);
assert_eq!(meta.consumed, 1);
} else {
assert_eq!(meta.parent_slot, 0);
assert_eq!(meta.consumed, num_shreds_per_slot);
}
}
}
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
fn test_find_missing_data_indexes() {
let slot = 0;
let blocktree_path = get_tmp_ledger_path!();
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let gap: u64 = 10;
assert!(gap > 3);
let num_entries = max_ticks_per_n_shreds(1) + 1;
let entries = create_ticks(num_entries, Hash::default());
let mut shreds = entries_to_test_shreds(entries, slot, 0, true);
let num_shreds = shreds.len();
assert!(num_shreds > 1);
for (i, s) in shreds.iter_mut().enumerate() {
s.set_index(i as u32 * gap as u32);
s.set_slot(slot);
}
blocktree.insert_shreds(shreds, None).unwrap();
let expected: Vec<u64> = (1..gap).collect();
assert_eq!(
blocktree.find_missing_data_indexes(slot, 0, gap, gap as usize),
expected
);
assert_eq!(
blocktree.find_missing_data_indexes(slot, 1, gap, (gap - 1) as usize),
expected,
);
assert_eq!(
blocktree.find_missing_data_indexes(slot, 0, gap - 1, (gap - 1) as usize),
&expected[..expected.len() - 1],
);
assert_eq!(
blocktree.find_missing_data_indexes(slot, gap - 2, gap, gap as usize),
vec![gap - 2, gap - 1],
);
assert_eq!(
blocktree.find_missing_data_indexes(slot, gap - 2, gap, 1),
vec![gap - 2],
);
assert_eq!(
blocktree.find_missing_data_indexes(slot, 0, gap, 1),
vec![1],
);
let mut expected: Vec<u64> = (1..gap).collect();
expected.push(gap + 1);
assert_eq!(
blocktree.find_missing_data_indexes(slot, 0, gap + 2, (gap + 2) as usize),
expected,
);
assert_eq!(
blocktree.find_missing_data_indexes(slot, 0, gap + 2, (gap - 1) as usize),
&expected[..expected.len() - 1],
);
for i in 0..num_shreds as u64 {
for j in 0..i {
let expected: Vec<u64> = (j..i)
.flat_map(|k| {
let begin = k * gap + 1;
let end = (k + 1) * gap;
(begin..end)
})
.collect();
assert_eq!(
blocktree.find_missing_data_indexes(
slot,
j * gap,
i * gap,
((i - j) * gap) as usize
),
expected,
);
}
}
drop(blocktree);
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
fn test_find_missing_data_indexes_sanity() {
let slot = 0;
let blocktree_path = get_tmp_ledger_path!();
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let empty: Vec<u64> = vec![];
assert_eq!(blocktree.find_missing_data_indexes(slot, 0, 0, 1), empty);
assert_eq!(blocktree.find_missing_data_indexes(slot, 5, 5, 1), empty);
assert_eq!(blocktree.find_missing_data_indexes(slot, 4, 3, 1), empty);
assert_eq!(blocktree.find_missing_data_indexes(slot, 1, 2, 0), empty);
let entries = create_ticks(100, Hash::default());
let mut shreds = entries_to_test_shreds(entries, slot, 0, true);
assert!(shreds.len() > 2);
shreds.drain(2..);
const ONE: u64 = 1;
const OTHER: u64 = 4;
shreds[0].set_index(ONE as u32);
shreds[1].set_index(OTHER as u32);
blocktree.insert_shreds(shreds, None).unwrap();
const STARTS: u64 = OTHER * 2;
const END: u64 = OTHER * 3;
const MAX: usize = 10;
for start in 0..STARTS {
let result = blocktree.find_missing_data_indexes(
slot, start,
END,
MAX,
);
let expected: Vec<u64> = (start..END).filter(|i| *i != ONE && *i != OTHER).collect();
assert_eq!(result, expected);
}
drop(blocktree);
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_no_missing_shred_indexes() {
let slot = 0;
let blocktree_path = get_tmp_ledger_path!();
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let num_entries = 10;
let entries = create_ticks(num_entries, Hash::default());
let shreds = entries_to_test_shreds(entries, slot, 0, true);
let num_shreds = shreds.len();
blocktree.insert_shreds(shreds, None).unwrap();
let empty: Vec<u64> = vec![];
for i in 0..num_shreds as u64 {
for j in 0..i {
assert_eq!(
blocktree.find_missing_data_indexes(slot, j, i, (i - j) as usize),
empty
);
}
}
drop(blocktree);
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_should_insert_data_shred() {
let (mut shreds, _) = make_slot_entries(0, 0, 200);
let blocktree_path = get_tmp_ledger_path!();
{
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let index_cf = blocktree.db.column::<cf::Index>();
let last_root = RwLock::new(0);
blocktree
.insert_shreds(shreds[0..5].to_vec(), None)
.unwrap();
let slot_meta = blocktree.meta(0).unwrap().unwrap();
let index = index_cf.get(0).unwrap().unwrap();
assert_eq!(slot_meta.consumed, 5);
assert_eq!(
Blocktree::should_insert_data_shred(
&shreds[1],
&slot_meta,
index.data(),
&last_root
),
false
);
blocktree
.insert_shreds(shreds[6..7].to_vec(), None)
.unwrap();
let slot_meta = blocktree.meta(0).unwrap().unwrap();
let index = index_cf.get(0).unwrap().unwrap();
assert_eq!(
Blocktree::should_insert_data_shred(
&shreds[6],
&slot_meta,
index.data(),
&last_root
),
false
);
blocktree
.insert_shreds(shreds[8..9].to_vec(), None)
.unwrap();
let slot_meta = blocktree.meta(0).unwrap().unwrap();
let index = index_cf.get(0).unwrap().unwrap();
assert_eq!(slot_meta.received, 9);
let shred7 = {
if shreds[7].is_data() {
shreds[7].set_last_in_slot();
shreds[7].clone()
} else {
panic!("Shred in unexpected format")
}
};
assert_eq!(
Blocktree::should_insert_data_shred(&shred7, &slot_meta, index.data(), &last_root),
false
);
let mut shred8 = shreds[8].clone();
blocktree.insert_shreds(shreds, None).unwrap();
let slot_meta = blocktree.meta(0).unwrap().unwrap();
let index = index_cf.get(0).unwrap().unwrap();
if shred8.is_data() {
shred8.set_slot(slot_meta.last_index + 1);
} else {
panic!("Shred in unexpected format")
}
assert_eq!(
Blocktree::should_insert_data_shred(&shred7, &slot_meta, index.data(), &last_root),
false
);
}
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_should_insert_coding_shred() {
let blocktree_path = get_tmp_ledger_path!();
{
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let index_cf = blocktree.db.column::<cf::Index>();
let last_root = RwLock::new(0);
let slot = 1;
let (mut shred, coding) = Shredder::new_coding_shred_header(slot, 11, 11, 11, 10);
let coding_shred = Shred::new_empty_from_header(
shred.clone(),
DataShredHeader::default(),
coding.clone(),
);
assert!(Blocktree::should_insert_coding_shred(
&coding_shred,
Index::new(slot).coding(),
&last_root
));
blocktree
.insert_shreds(vec![coding_shred.clone()], None)
.unwrap();
{
let index = index_cf.get(shred.slot).unwrap().unwrap();
assert!(!Blocktree::should_insert_coding_shred(
&coding_shred,
index.coding(),
&last_root
));
}
shred.index += 1;
{
let coding_shred = Shred::new_empty_from_header(
shred.clone(),
DataShredHeader::default(),
coding.clone(),
);
let index = index_cf.get(shred.slot).unwrap().unwrap();
assert!(Blocktree::should_insert_coding_shred(
&coding_shred,
index.coding(),
&last_root
));
}
{
let mut coding_shred = Shred::new_empty_from_header(
shred.clone(),
DataShredHeader::default(),
coding.clone(),
);
let index = coding_shred.coding_header.position - 1;
coding_shred.set_index(index as u32);
let index = index_cf.get(coding_shred.slot()).unwrap().unwrap();
assert!(!Blocktree::should_insert_coding_shred(
&coding_shred,
index.coding(),
&last_root
));
}
{
let mut coding_shred = Shred::new_empty_from_header(
shred.clone(),
DataShredHeader::default(),
coding.clone(),
);
coding_shred.coding_header.num_coding_shreds = 0;
let index = index_cf.get(coding_shred.slot()).unwrap().unwrap();
assert!(!Blocktree::should_insert_coding_shred(
&coding_shred,
index.coding(),
&last_root
));
}
{
let mut coding_shred = Shred::new_empty_from_header(
shred.clone(),
DataShredHeader::default(),
coding.clone(),
);
coding_shred.coding_header.num_coding_shreds = coding_shred.coding_header.position;
let index = index_cf.get(coding_shred.slot()).unwrap().unwrap();
assert!(!Blocktree::should_insert_coding_shred(
&coding_shred,
index.coding(),
&last_root
));
}
{
let mut coding_shred = Shred::new_empty_from_header(
shred.clone(),
DataShredHeader::default(),
coding.clone(),
);
coding_shred.coding_header.num_coding_shreds = 3;
coding_shred.common_header.index = std::u32::MAX - 1;
coding_shred.coding_header.position = 0;
let index = index_cf.get(coding_shred.slot()).unwrap().unwrap();
assert!(!Blocktree::should_insert_coding_shred(
&coding_shred,
index.coding(),
&last_root
));
coding_shred.coding_header.num_coding_shreds = 2;
assert!(Blocktree::should_insert_coding_shred(
&coding_shred,
index.coding(),
&last_root
));
blocktree.insert_shreds(vec![coding_shred], None).unwrap();
}
{
let mut coding_shred = Shred::new_empty_from_header(
shred.clone(),
DataShredHeader::default(),
coding.clone(),
);
let index = index_cf.get(coding_shred.slot()).unwrap().unwrap();
coding_shred.set_slot(*last_root.read().unwrap());
assert!(!Blocktree::should_insert_coding_shred(
&coding_shred,
index.coding(),
&last_root
));
}
}
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_insert_multiple_is_last() {
let (shreds, _) = make_slot_entries(0, 0, 20);
let num_shreds = shreds.len() as u64;
let blocktree_path = get_tmp_ledger_path!();
let blocktree = Blocktree::open(&blocktree_path).unwrap();
blocktree.insert_shreds(shreds, None).unwrap();
let slot_meta = blocktree.meta(0).unwrap().unwrap();
assert_eq!(slot_meta.consumed, num_shreds);
assert_eq!(slot_meta.received, num_shreds);
assert_eq!(slot_meta.last_index, num_shreds - 1);
assert!(slot_meta.is_full());
let (shreds, _) = make_slot_entries(0, 0, 22);
blocktree.insert_shreds(shreds, None).unwrap();
let slot_meta = blocktree.meta(0).unwrap().unwrap();
assert_eq!(slot_meta.consumed, num_shreds);
assert_eq!(slot_meta.received, num_shreds);
assert_eq!(slot_meta.last_index, num_shreds - 1);
assert!(slot_meta.is_full());
drop(blocktree);
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
fn test_slot_data_iterator() {
let blocktree_path = get_tmp_ledger_path!();
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let shreds_per_slot = 10;
let slots = vec![2, 4, 8, 12];
let all_shreds = make_chaining_slot_entries(&slots, shreds_per_slot);
let slot_8_shreds = all_shreds[2].0.clone();
for (slot_shreds, _) in all_shreds {
blocktree.insert_shreds(slot_shreds, None).unwrap();
}
let shred_iter = blocktree.slot_data_iterator(5).unwrap();
let result: Vec<_> = shred_iter.collect();
assert_eq!(result, vec![]);
let shred_iter = blocktree.slot_data_iterator(8).unwrap();
let result: Vec<Shred> = shred_iter
.filter_map(|(_, bytes)| Shred::new_from_serialized_shred(bytes.to_vec()).ok())
.collect();
assert_eq!(result.len(), slot_8_shreds.len());
assert_eq!(result, slot_8_shreds);
drop(blocktree);
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
fn test_set_roots() {
let blocktree_path = get_tmp_ledger_path!();
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let chained_slots = vec![0, 2, 4, 7, 12, 15];
assert_eq!(blocktree.last_root(), 0);
blocktree.set_roots(&chained_slots).unwrap();
assert_eq!(blocktree.last_root(), 15);
for i in chained_slots {
assert!(blocktree.is_root(i));
}
drop(blocktree);
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
fn test_prune() {
let blocktree_path = get_tmp_ledger_path!();
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let (shreds, _) = make_many_slot_entries(0, 50, 6);
let shreds_per_slot = shreds.len() as u64 / 50;
blocktree.insert_shreds(shreds, None).unwrap();
blocktree
.slot_meta_iterator(0)
.unwrap()
.for_each(|(_, meta)| assert_eq!(meta.last_index, shreds_per_slot - 1));
blocktree.prune(5);
blocktree
.slot_meta_iterator(0)
.unwrap()
.for_each(|(slot, meta)| {
assert!(slot <= 5);
assert_eq!(meta.last_index, shreds_per_slot - 1)
});
let data_iter = blocktree
.data_shred_cf
.iter(IteratorMode::From((0, 0), IteratorDirection::Forward))
.unwrap();
for ((slot, _), _) in data_iter {
if slot > 5 {
assert!(false);
}
}
drop(blocktree);
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
fn test_purge_slots() {
let blocktree_path = get_tmp_ledger_path!();
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let (shreds, _) = make_many_slot_entries(0, 50, 5);
blocktree.insert_shreds(shreds, None).unwrap();
blocktree.purge_slots(0, Some(5));
blocktree
.slot_meta_iterator(0)
.unwrap()
.for_each(|(slot, _)| {
assert!(slot > 5);
});
blocktree.purge_slots(0, None);
blocktree.slot_meta_iterator(0).unwrap().for_each(|(_, _)| {
assert!(false);
});
drop(blocktree);
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
fn test_purge_huge() {
let blocktree_path = get_tmp_ledger_path!();
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let (shreds, _) = make_many_slot_entries(0, 5000, 10);
blocktree.insert_shreds(shreds, None).unwrap();
blocktree.purge_slots(0, Some(4999));
blocktree
.slot_meta_iterator(0)
.unwrap()
.for_each(|(slot, _)| {
assert_eq!(slot, 5000);
});
drop(blocktree);
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[should_panic]
#[test]
fn test_prune_out_of_bounds() {
let blocktree_path = get_tmp_ledger_path!();
let blocktree = Blocktree::open(&blocktree_path).unwrap();
blocktree.prune(5);
drop(blocktree);
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
fn test_iter_bounds() {
let blocktree_path = get_tmp_ledger_path!();
let blocktree = Blocktree::open(&blocktree_path).unwrap();
blocktree
.slot_meta_iterator(5)
.unwrap()
.for_each(|_| assert!(false));
drop(blocktree);
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
fn test_get_completed_data_ranges() {
let completed_data_end_indexes = vec![2, 4, 9, 11];
let start_index = 0;
let consumed = 1;
assert_eq!(
Blocktree::get_completed_data_ranges(
start_index,
&completed_data_end_indexes[..],
consumed
),
vec![]
);
let start_index = 0;
let consumed = 3;
assert_eq!(
Blocktree::get_completed_data_ranges(
start_index,
&completed_data_end_indexes[..],
consumed
),
vec![(0, 2)]
);
for i in 0..completed_data_end_indexes.len() {
for j in i..completed_data_end_indexes.len() {
let start_index = completed_data_end_indexes[i];
let consumed = completed_data_end_indexes[j] + 1;
let mut expected = vec![(start_index, start_index)];
expected.extend(
completed_data_end_indexes[i..=j]
.windows(2)
.map(|end_indexes| (end_indexes[0] + 1, end_indexes[1])),
);
assert_eq!(
Blocktree::get_completed_data_ranges(
start_index,
&completed_data_end_indexes[..],
consumed
),
expected
);
}
}
}
#[test]
fn test_get_slot_entries_with_shred_count_corruption() {
let blocktree_path =
get_tmp_ledger_path("test_get_slot_entries_with_shred_count_corruption");
{
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let num_ticks = 8;
let entries = create_ticks(num_ticks, Hash::default());
let slot = 1;
let shreds = entries_to_test_shreds(entries, slot, 0, false);
let next_shred_index = shreds.len();
blocktree
.insert_shreds(shreds, None)
.expect("Expected successful write of shreds");
assert_eq!(
blocktree.get_slot_entries(slot, 0, None).unwrap().len() as u64,
num_ticks
);
let shreds = vec![Shred::new_from_data(
slot,
next_shred_index as u32,
1,
None,
true,
true,
)];
blocktree
.insert_shreds(shreds, None)
.expect("Expected successful write of shreds");
assert!(blocktree.get_slot_entries(slot, 0, None).is_err());
}
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
fn test_no_insert_but_modify_slot_meta() {
let (shreds0, _) = make_slot_entries(0, 0, 200);
let blocktree_path = get_tmp_ledger_path!();
{
let blocktree = Blocktree::open(&blocktree_path).unwrap();
blocktree
.insert_shreds(shreds0[0..5].to_vec(), None)
.unwrap();
let (mut shreds2, _) = make_slot_entries(2, 0, 200);
let (mut shreds3, _) = make_slot_entries(3, 0, 200);
shreds2.push(shreds0[1].clone());
shreds3.insert(0, shreds0[1].clone());
blocktree.insert_shreds(shreds2, None).unwrap();
let slot_meta = blocktree.meta(0).unwrap().unwrap();
assert_eq!(slot_meta.next_slots, vec![2]);
blocktree.insert_shreds(shreds3, None).unwrap();
let slot_meta = blocktree.meta(0).unwrap().unwrap();
assert_eq!(slot_meta.next_slots, vec![2, 3]);
}
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
}