use crate::{
config::TangleConfig,
metadata::{IndexId, MessageMetadata},
solid_entry_point::SolidEntryPoint,
storage::StorageBackend,
tangle::{Hooks, Tangle, DEFAULT_CACHE_LEN},
urts::UrtsTipPool,
MessageRef,
};
use bee_message::{
milestone::{Milestone, MilestoneIndex},
Message, MessageId,
};
use bee_runtime::resource::ResourceHandle;
use async_trait::async_trait;
use hashbrown::HashMap;
use log::{info, trace};
use ref_cast::RefCast;
use tokio::sync::Mutex;
use std::{
ops::Deref,
sync::atomic::{AtomicU32, Ordering},
};
const SYNCED_THRESHOLD: u32 = 2;
const CONFIRMED_THRESHOLD: u32 = 2;
pub struct StorageHooks<B> {
#[allow(dead_code)]
storage: ResourceHandle<B>,
}
#[async_trait]
impl<B: StorageBackend> Hooks<MessageMetadata> for StorageHooks<B> {
type Error = B::Error;
async fn get(&self, id: &MessageId) -> Result<Option<(Message, MessageMetadata)>, Self::Error> {
trace!("Attempted to fetch message {:?}", id);
let msg = self.storage.fetch(id).await?;
let meta = self.storage.fetch(id).await?;
Ok(msg.zip(meta))
}
async fn insert(&self, id: MessageId, tx: Message, metadata: MessageMetadata) -> Result<(), Self::Error> {
trace!("Attempted to insert message {:?}", id);
self.storage.insert(&id, &tx).await?;
self.storage.insert(&id, &metadata).await?;
Ok(())
}
async fn fetch_approvers(&self, id: &MessageId) -> Result<Option<Vec<MessageId>>, Self::Error> {
trace!("Attempted to fetch approvers for message {:?}", id);
self.storage.fetch(id).await
}
async fn insert_approver(&self, id: MessageId, approver: MessageId) -> Result<(), Self::Error> {
trace!("Attempted to insert approver for message {:?}", id);
self.storage.insert(&(id, approver), &()).await
}
async fn update_approvers(&self, id: MessageId, approvers: &[MessageId]) -> Result<(), Self::Error> {
trace!("Attempted to update approvers for message {:?}", id);
for approver in approvers {
self.storage.insert(&(id, *approver), &()).await?;
}
Ok(())
}
}
impl<B: StorageBackend> StorageHooks<B> {
async fn get_milestone(&self, idx: &MilestoneIndex) -> Result<Option<Milestone>, B::Error> {
trace!("Attempted to fetch milestone {:?}", idx);
Ok(self.storage.fetch(idx).await?)
}
async fn insert_milestone(&self, idx: MilestoneIndex, milestone: &Milestone) -> Result<(), B::Error> {
trace!("Attempted to insert milestone {:?}", idx);
self.storage.insert(&idx, milestone).await?;
Ok(())
}
}
pub struct MsTangle<B> {
config: TangleConfig,
inner: Tangle<MessageMetadata, StorageHooks<B>>,
milestones: Mutex<HashMap<MilestoneIndex, Milestone>>,
solid_entry_points: Mutex<HashMap<SolidEntryPoint, MilestoneIndex>>,
latest_milestone_index: AtomicU32,
solid_milestone_index: AtomicU32,
confirmed_milestone_index: AtomicU32,
snapshot_index: AtomicU32,
pruning_index: AtomicU32,
entry_point_index: AtomicU32,
tip_pool: Mutex<UrtsTipPool>,
}
impl<B> Deref for MsTangle<B> {
type Target = Tangle<MessageMetadata, StorageHooks<B>>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl<B: StorageBackend> MsTangle<B> {
pub fn new(config: TangleConfig, storage: ResourceHandle<B>) -> Self {
Self {
inner: Tangle::new(StorageHooks { storage }),
milestones: Default::default(),
solid_entry_points: Default::default(),
latest_milestone_index: Default::default(),
solid_milestone_index: Default::default(),
confirmed_milestone_index: Default::default(),
snapshot_index: Default::default(),
pruning_index: Default::default(),
entry_point_index: Default::default(),
tip_pool: Mutex::new(UrtsTipPool::new(&config)),
config,
}
}
pub async fn shutdown(self) {
}
pub fn config(&self) -> &TangleConfig {
&self.config
}
pub async fn insert(&self, message: Message, hash: MessageId, metadata: MessageMetadata) -> Option<MessageRef> {
self.inner.insert(hash, message, metadata).await
}
pub async fn add_milestone(&self, idx: MilestoneIndex, milestone: Milestone) {
self.inner
.update_metadata(&milestone.message_id(), |metadata| {
metadata.flags_mut().set_milestone(true);
metadata.set_milestone_index(idx);
metadata.set_omrsi(IndexId::new(idx, *milestone.message_id()));
metadata.set_ymrsi(IndexId::new(idx, *milestone.message_id()));
})
.await;
self.inner
.hooks()
.insert_milestone(idx, &milestone)
.await
.unwrap_or_else(|e| info!("Failed to insert message {:?}", e));
self.milestones.lock().await.insert(idx, milestone);
}
pub async fn remove_milestone(&self, index: MilestoneIndex) {
self.milestones.lock().await.remove(&index);
}
async fn pull_milestone(&self, idx: MilestoneIndex) -> Option<MessageId> {
if let Some(milestone) = self.inner.hooks().get_milestone(&idx).await.unwrap_or_else(|e| {
info!("Failed to insert message {:?}", e);
None
}) {
let message_id = *self
.milestones
.lock()
.await
.entry(idx)
.or_insert(milestone)
.message_id();
Some(message_id)
} else {
None
}
}
pub async fn get_milestone(&self, index: MilestoneIndex) -> Option<Milestone> {
self.milestones.lock().await.get(&index).cloned()
}
pub async fn get_milestone_message(&self, index: MilestoneIndex) -> Option<MessageRef> {
match self.get_milestone_message_id(index).await {
None => None,
Some(ref hash) => self.get(hash).await,
}
}
pub async fn get_milestone_message_id(&self, index: MilestoneIndex) -> Option<MessageId> {
let message_id = self.milestones.lock().await.get(&index).map(|m| *m.message_id());
match message_id {
Some(m) => Some(m),
None => Some(self.pull_milestone(index).await?),
}
}
pub async fn contains_milestone(&self, idx: MilestoneIndex) -> bool {
if self.milestones.lock().await.contains_key(&idx) {
return true;
}
self.pull_milestone(idx).await.is_some()
}
pub fn get_latest_milestone_index(&self) -> MilestoneIndex {
self.latest_milestone_index.load(Ordering::Relaxed).into()
}
pub fn update_latest_milestone_index(&self, new_index: MilestoneIndex) {
self.latest_milestone_index.store(*new_index, Ordering::Relaxed);
}
pub fn get_solid_milestone_index(&self) -> MilestoneIndex {
self.solid_milestone_index.load(Ordering::Relaxed).into()
}
pub fn update_solid_milestone_index(&self, new_index: MilestoneIndex) {
self.solid_milestone_index.store(*new_index, Ordering::Relaxed);
let new_len = ((1000.0 + self.get_sync_threshold() as f32 * 500.0) as usize)
.min(DEFAULT_CACHE_LEN)
.max(8192);
self.inner.resize(new_len);
}
pub fn get_confirmed_milestone_index(&self) -> MilestoneIndex {
self.confirmed_milestone_index.load(Ordering::Relaxed).into()
}
pub fn update_confirmed_milestone_index(&self, new_index: MilestoneIndex) {
self.confirmed_milestone_index.store(*new_index, Ordering::Relaxed);
}
pub fn get_snapshot_index(&self) -> MilestoneIndex {
self.snapshot_index.load(Ordering::Relaxed).into()
}
pub fn update_snapshot_index(&self, new_index: MilestoneIndex) {
self.snapshot_index.store(*new_index, Ordering::Relaxed);
}
pub fn get_pruning_index(&self) -> MilestoneIndex {
self.pruning_index.load(Ordering::Relaxed).into()
}
pub fn update_pruning_index(&self, new_index: MilestoneIndex) {
self.pruning_index.store(*new_index, Ordering::Relaxed);
}
pub fn get_entry_point_index(&self) -> MilestoneIndex {
self.entry_point_index.load(Ordering::Relaxed).into()
}
pub fn update_entry_point_index(&self, new_index: MilestoneIndex) {
self.entry_point_index.store(*new_index, Ordering::Relaxed);
}
pub fn is_synced(&self) -> bool {
self.is_synced_threshold(SYNCED_THRESHOLD)
}
pub fn get_sync_threshold(&self) -> u32 {
self.get_latest_milestone_index()
.saturating_sub(*self.get_solid_milestone_index())
}
pub fn is_synced_threshold(&self, threshold: u32) -> bool {
*self.get_solid_milestone_index() >= self.get_latest_milestone_index().saturating_sub(threshold)
}
pub fn is_confirmed(&self) -> bool {
self.is_confirmed_threshold(CONFIRMED_THRESHOLD)
}
pub fn is_confirmed_threshold(&self, threshold: u32) -> bool {
*self.get_confirmed_milestone_index() >= self.get_latest_milestone_index().saturating_sub(threshold)
}
pub async fn get_solid_entry_point_index(&self, sep: &SolidEntryPoint) -> Option<MilestoneIndex> {
self.solid_entry_points.lock().await.get(sep).copied()
}
pub async fn add_solid_entry_point(&self, sep: SolidEntryPoint, index: MilestoneIndex) {
self.solid_entry_points.lock().await.insert(sep, index);
}
pub async fn remove_solid_entry_point(&self, sep: &SolidEntryPoint) {
self.solid_entry_points.lock().await.remove(sep);
}
pub async fn clear_solid_entry_points(&self) {
self.solid_entry_points.lock().await.clear();
}
pub async fn is_solid_entry_point(&self, id: &MessageId) -> bool {
self.solid_entry_points
.lock()
.await
.contains_key(SolidEntryPoint::ref_cast(id))
}
pub async fn is_solid_message(&self, id: &MessageId) -> bool {
if self.is_solid_entry_point(id).await {
true
} else {
self.inner
.get_metadata(id)
.await
.map(|metadata| metadata.flags().is_solid())
.unwrap_or(false)
}
}
pub async fn omrsi(&self, id: &MessageId) -> Option<IndexId> {
match self.solid_entry_points.lock().await.get(SolidEntryPoint::ref_cast(id)) {
Some(sep) => Some(IndexId::new(*sep, *id)),
None => match self.get_metadata(id).await {
Some(metadata) => metadata.omrsi(),
None => None,
},
}
}
pub async fn ymrsi(&self, id: &MessageId) -> Option<IndexId> {
match self.solid_entry_points.lock().await.get(SolidEntryPoint::ref_cast(id)) {
Some(sep) => Some(IndexId::new(*sep, *id)),
None => match self.get_metadata(id).await {
Some(metadata) => metadata.ymrsi(),
None => None,
},
}
}
pub async fn insert_tip(&self, message_id: MessageId, parents: Vec<MessageId>) {
self.tip_pool.lock().await.insert(&self, message_id, parents).await;
}
pub async fn update_tip_scores(&self) {
self.tip_pool.lock().await.update_scores(&self).await;
}
pub async fn get_messages_to_approve(&self) -> Option<Vec<MessageId>> {
self.tip_pool.lock().await.choose_non_lazy_tips()
}
pub async fn reduce_tips(&self) {
self.tip_pool.lock().await.reduce_tips();
}
pub async fn non_lazy_tips_num(&self) -> usize {
self.tip_pool.lock().await.non_lazy_tips().len()
}
}