bee_tangle/
tangle.rs

1// Copyright 2020-2021 IOTA Stiftung
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::{
5    config::TangleConfig,
6    metadata::{IndexId, MessageMetadata},
7    solid_entry_point::SolidEntryPoint,
8    storage::StorageBackend,
9    urts::UrtsTipPool,
10    vertex::Vertex,
11    vertices::Vertices,
12    MessageRef,
13};
14
15use bee_message::{
16    milestone::{Milestone, MilestoneIndex},
17    Message, MessageId,
18};
19use bee_runtime::resource::ResourceHandle;
20
21use hashbrown::HashMap;
22use log::info;
23use ref_cast::RefCast;
24use tokio::sync::Mutex;
25
26use std::{
27    marker::PhantomData,
28    ops::{Deref, DerefMut},
29    sync::atomic::{AtomicU32, AtomicUsize, Ordering},
30};
31
32const DEFAULT_CACHE_LEN: usize = 100_000;
33const CACHE_THRESHOLD_FACTOR: f64 = 0.1;
34const SYNCED_THRESHOLD: u32 = 2;
35const CONFIRMED_THRESHOLD: u32 = 2;
36
37/// A Tangle wrapper designed to encapsulate milestone state.
38pub struct Tangle<B> {
39    config: TangleConfig,
40    vertices: Vertices,
41    max_len: AtomicUsize,
42    storage: ResourceHandle<B>,
43    milestones: Mutex<HashMap<MilestoneIndex, Milestone>>,
44    solid_entry_points: Mutex<HashMap<SolidEntryPoint, MilestoneIndex>>,
45    latest_milestone_index: AtomicU32,
46    solid_milestone_index: AtomicU32,
47    confirmed_milestone_index: AtomicU32,
48    snapshot_index: AtomicU32,
49    pruning_index: AtomicU32,
50    entry_point_index: AtomicU32,
51    tip_pool: Mutex<UrtsTipPool>,
52}
53
54impl<B: StorageBackend> Tangle<B> {
55    /// Create a new `Tangle` instance with the given configuration and storage handle.
56    pub fn new(config: TangleConfig, storage: ResourceHandle<B>) -> Self {
57        Self {
58            vertices: Vertices::new(config.num_partitions()),
59            max_len: AtomicUsize::new(DEFAULT_CACHE_LEN),
60            storage,
61            milestones: Default::default(),
62            solid_entry_points: Default::default(),
63            latest_milestone_index: Default::default(),
64            solid_milestone_index: Default::default(),
65            confirmed_milestone_index: Default::default(),
66            snapshot_index: Default::default(),
67            pruning_index: Default::default(),
68            entry_point_index: Default::default(),
69            tip_pool: Mutex::new(UrtsTipPool::new(&config)),
70            config,
71        }
72    }
73
74    /// Shut down the tangle, terminating any and all worker tasks.
75    pub async fn shutdown(self) {
76        // TODO: Write back changes by calling self.inner.shutdown().await
77    }
78
79    /// Get the configuration of this tangle.
80    pub fn config(&self) -> &TangleConfig {
81        &self.config
82    }
83
84    /// Insert a message into the tangle.
85    pub async fn insert(
86        &self,
87        message: Message,
88        message_id: MessageId,
89        metadata: MessageMetadata,
90    ) -> Option<MessageRef> {
91        let exists = self.pull_message(&message_id, true).await;
92
93        let msg = self.insert_inner(message_id, message.clone(), metadata, !exists).await;
94
95        self.vertices
96            .get_mut(&message_id)
97            .await
98            .expect("Just-inserted message is missing")
99            .allow_eviction();
100
101        if msg.is_some() {
102            // Write parents to DB
103            for &parent in message.parents().iter() {
104                self.storage
105                    .insert(&(parent, message_id), &())
106                    .unwrap_or_else(|e| info!("Failed to update approvers for message {:?}", e));
107            }
108
109            // Insert into backend using hooks
110            self.storage_insert(message_id, message, metadata)
111                .unwrap_or_else(|e| info!("Failed to insert message {:?}", e));
112        }
113
114        msg
115    }
116
117    /// Add a milestone to the tangle.
118    pub async fn add_milestone(&self, idx: MilestoneIndex, milestone: Milestone) {
119        // TODO: only insert if vacant
120        self.update_metadata(milestone.message_id(), |metadata| {
121            metadata.flags_mut().set_milestone(true);
122            metadata.set_milestone_index(idx);
123            metadata.set_omrsi(IndexId::new(idx, *milestone.message_id()));
124            metadata.set_ymrsi(IndexId::new(idx, *milestone.message_id()));
125        })
126        .await;
127        self.storage()
128            .insert(&idx, &milestone)
129            .unwrap_or_else(|e| info!("Failed to insert message {:?}", e));
130        self.milestones.lock().await.insert(idx, milestone);
131    }
132
133    /// Remove a milestone from the tangle.
134    pub async fn remove_milestone(&self, index: MilestoneIndex) {
135        self.milestones.lock().await.remove(&index);
136    }
137
138    async fn pull_milestone(&self, idx: MilestoneIndex) -> Option<MessageId> {
139        if let Some(milestone) = self.storage().fetch(&idx).unwrap_or_else(|e| {
140            info!("Failed to insert message {:?}", e);
141            None
142        }) {
143            let message_id = *self
144                .milestones
145                .lock()
146                .await
147                .entry(idx)
148                .or_insert(milestone)
149                .message_id();
150
151            Some(message_id)
152        } else {
153            None
154        }
155    }
156
157    /// Get the milestone from the tangle that corresponds to the given milestone index.
158    pub async fn get_milestone(&self, index: MilestoneIndex) -> Option<Milestone> {
159        self.milestones.lock().await.get(&index).cloned()
160    }
161
162    /// Get the message associated with the given milestone index from the tangle.
163    pub async fn get_milestone_message(&self, index: MilestoneIndex) -> Option<MessageRef> {
164        // TODO: use combinator instead of match
165        match self.get_milestone_message_id(index).await {
166            None => None,
167            Some(ref hash) => self.get(hash).await,
168        }
169    }
170
171    /// Get the message ID associated with the given milestone index from the tangle.
172    pub async fn get_milestone_message_id(&self, index: MilestoneIndex) -> Option<MessageId> {
173        let message_id = self.milestones.lock().await.get(&index).map(|m| *m.message_id());
174
175        // TODO: use combinator instead of match
176        match message_id {
177            Some(m) => Some(m),
178            None => Some(self.pull_milestone(index).await?),
179        }
180    }
181
182    /// Return whether the tangle contains the given milestone index.
183    pub async fn contains_milestone(&self, idx: MilestoneIndex) -> bool {
184        // Not using `||` as its first operand would keep the lock alive causing a deadlock with its second operand.
185        if self.milestones.lock().await.contains_key(&idx) {
186            return true;
187        }
188        self.pull_milestone(idx).await.is_some()
189    }
190
191    /// Get the index of the latest milestone.
192    pub fn get_latest_milestone_index(&self) -> MilestoneIndex {
193        self.latest_milestone_index.load(Ordering::Relaxed).into()
194    }
195
196    /// Update the index of the lastest milestone.
197    pub fn update_latest_milestone_index(&self, new_index: MilestoneIndex) {
198        // TODO: `fetch_max`? Swap and ensure the old is smaller?
199        self.latest_milestone_index.store(*new_index, Ordering::Relaxed);
200    }
201
202    /// Get the latest solid milestone index.
203    pub fn get_solid_milestone_index(&self) -> MilestoneIndex {
204        self.solid_milestone_index.load(Ordering::Relaxed).into()
205    }
206
207    /// Update the latest solid milestone index.
208    pub fn update_solid_milestone_index(&self, new_index: MilestoneIndex) {
209        self.solid_milestone_index.store(*new_index, Ordering::Relaxed);
210
211        // TODO: Formalise this a little better
212        let new_len = ((1000.0 + self.get_sync_threshold() as f32 * 500.0) as usize)
213            .min(DEFAULT_CACHE_LEN)
214            .max(8192);
215        self.resize(new_len);
216    }
217
218    /// Get the latest confirmed milestone index.
219    pub fn get_confirmed_milestone_index(&self) -> MilestoneIndex {
220        self.confirmed_milestone_index.load(Ordering::Relaxed).into()
221    }
222
223    /// Update the latest confirmed milestone index.
224    pub fn update_confirmed_milestone_index(&self, new_index: MilestoneIndex) {
225        self.confirmed_milestone_index.store(*new_index, Ordering::Relaxed);
226    }
227
228    /// Get the snapshot index.
229    pub fn get_snapshot_index(&self) -> MilestoneIndex {
230        self.snapshot_index.load(Ordering::Relaxed).into()
231    }
232
233    /// Update the snapshot index.
234    pub fn update_snapshot_index(&self, new_index: MilestoneIndex) {
235        self.snapshot_index.store(*new_index, Ordering::Relaxed);
236    }
237
238    /// Get the pruning index.
239    pub fn get_pruning_index(&self) -> MilestoneIndex {
240        self.pruning_index.load(Ordering::Relaxed).into()
241    }
242
243    /// Update the pruning index.
244    pub fn update_pruning_index(&self, new_index: MilestoneIndex) {
245        self.pruning_index.store(*new_index, Ordering::Relaxed);
246    }
247
248    /// Get the entry point index.
249    pub fn get_entry_point_index(&self) -> MilestoneIndex {
250        self.entry_point_index.load(Ordering::Relaxed).into()
251    }
252
253    /// Update the entry point index.
254    pub fn update_entry_point_index(&self, new_index: MilestoneIndex) {
255        self.entry_point_index.store(*new_index, Ordering::Relaxed);
256    }
257
258    /// Return whether the tangle is within the default sync threshold.
259    pub fn is_synced(&self) -> bool {
260        // TODO reduce to one atomic value ?
261        self.is_synced_threshold(SYNCED_THRESHOLD)
262    }
263
264    /// Get the number of milestones until the tangle is synced.
265    pub fn get_sync_threshold(&self) -> u32 {
266        // TODO reduce to one atomic value ?
267        self.get_latest_milestone_index()
268            .saturating_sub(*self.get_solid_milestone_index())
269    }
270
271    /// Return whether the tangle is within the given sync threshold.
272    pub fn is_synced_threshold(&self, threshold: u32) -> bool {
273        // TODO reduce to one atomic value ?
274        *self.get_solid_milestone_index() >= self.get_latest_milestone_index().saturating_sub(threshold)
275    }
276
277    /// Return whether the tangle is fully confirmed.
278    pub fn is_confirmed(&self) -> bool {
279        // TODO reduce to one atomic value ?
280        self.is_confirmed_threshold(CONFIRMED_THRESHOLD)
281    }
282
283    /// Return whether the tangle is within the given confirmation threshold.
284    pub fn is_confirmed_threshold(&self, threshold: u32) -> bool {
285        // TODO reduce to one atomic value ?
286        *self.get_confirmed_milestone_index() >= self.get_latest_milestone_index().saturating_sub(threshold)
287    }
288
289    /// Get the milestone index associated with the given solid entry point.
290    pub async fn get_solid_entry_point_index(&self, sep: &SolidEntryPoint) -> Option<MilestoneIndex> {
291        self.solid_entry_points.lock().await.get(sep).copied()
292    }
293
294    /// Add the given solid entry point to the given milestone index.
295    pub async fn add_solid_entry_point(&self, sep: SolidEntryPoint, index: MilestoneIndex) {
296        self.solid_entry_points.lock().await.insert(sep, index);
297    }
298
299    /// Returns a copy of all solid entry points.
300    pub async fn get_solid_entry_points(&self) -> HashMap<SolidEntryPoint, MilestoneIndex> {
301        self.solid_entry_points.lock().await.clone()
302    }
303
304    /// Removes the given solid entry point from the set of solid entry points.
305    pub async fn remove_solid_entry_point(&self, sep: &SolidEntryPoint) {
306        self.solid_entry_points.lock().await.remove(sep);
307    }
308
309    /// Clear all solid entry points.
310    pub async fn clear_solid_entry_points(&self) {
311        self.solid_entry_points.lock().await.clear();
312    }
313
314    /// Replaces all solid entry points.
315    pub async fn replace_solid_entry_points(
316        &self,
317        new_seps: impl IntoIterator<Item = (SolidEntryPoint, MilestoneIndex)>,
318    ) {
319        let mut seps = self.solid_entry_points.lock().await;
320        seps.clear();
321        seps.extend(new_seps);
322    }
323
324    /// Returns whether the message associated with given solid entry point is a solid entry point.
325    pub async fn is_solid_entry_point(&self, id: &MessageId) -> bool {
326        self.solid_entry_points
327            .lock()
328            .await
329            .contains_key(SolidEntryPoint::ref_cast(id))
330    }
331
332    /// Returns whether the message associated with the given message ID is solid.
333    pub async fn is_solid_message(&self, id: &MessageId) -> bool {
334        if self.is_solid_entry_point(id).await {
335            true
336        } else {
337            self.get_metadata(id)
338                .await
339                .map(|metadata| metadata.flags().is_solid())
340                .unwrap_or(false)
341        }
342    }
343
344    /// Get the oldest milestone root snapshot index.
345    pub async fn omrsi(&self, id: &MessageId) -> Option<IndexId> {
346        match self.solid_entry_points.lock().await.get(SolidEntryPoint::ref_cast(id)) {
347            Some(sep) => Some(IndexId::new(*sep, *id)),
348            None => match self.get_metadata(id).await {
349                Some(metadata) => metadata.omrsi(),
350                None => None,
351            },
352        }
353    }
354
355    /// Get the youngest milestone root snapshot index.
356    pub async fn ymrsi(&self, id: &MessageId) -> Option<IndexId> {
357        match self.solid_entry_points.lock().await.get(SolidEntryPoint::ref_cast(id)) {
358            Some(sep) => Some(IndexId::new(*sep, *id)),
359            None => match self.get_metadata(id).await {
360                Some(metadata) => metadata.ymrsi(),
361                None => None,
362            },
363        }
364    }
365
366    /// Insert the given message ID and parents as a tip.
367    pub async fn insert_tip(&self, message_id: MessageId, parents: Vec<MessageId>) {
368        self.tip_pool.lock().await.insert(self, message_id, parents).await;
369    }
370
371    /// Update tip scores.
372    pub async fn update_tip_scores(&self) {
373        self.tip_pool.lock().await.update_scores(self).await;
374    }
375
376    /// Return messages that require approving.
377    pub async fn get_messages_to_approve(&self) -> Option<Vec<MessageId>> {
378        self.tip_pool.lock().await.choose_non_lazy_tips()
379    }
380
381    /// Reduce tips.
382    pub async fn reduce_tips(&self) {
383        self.tip_pool.lock().await.reduce_tips();
384    }
385
386    /// Return the number of non-lazy tips.
387    pub async fn non_lazy_tips_num(&self) -> usize {
388        self.tip_pool.lock().await.non_lazy_tips().len()
389    }
390
391    /// Change the maximum number of entries to store in the cache.
392    fn resize(&self, len: usize) {
393        self.max_len.store(len, Ordering::Relaxed);
394    }
395
396    /// Return a reference to the storage  used by this tangle.
397    fn storage(&self) -> &B {
398        &self.storage
399    }
400
401    async fn insert_inner(
402        &self,
403        message_id: MessageId,
404        message: Message,
405        metadata: MessageMetadata,
406        prevent_eviction: bool,
407    ) -> Option<MessageRef> {
408        let mut vertex = self.vertices.get_mut_or_empty(message_id).await;
409
410        if prevent_eviction {
411            vertex.prevent_eviction();
412        }
413
414        let msg = if vertex.message().is_some() {
415            drop(vertex);
416            None
417        } else {
418            let parents = message.parents().clone();
419
420            vertex.insert_message_and_metadata(message, metadata);
421            let msg = vertex.message().cloned();
422            drop(vertex);
423
424            // Insert children for parents
425            for &parent in parents.iter() {
426                self.vertices.get_mut_or_empty(parent).await.add_child(message_id);
427            }
428
429            msg
430        };
431
432        self.perform_eviction().await;
433
434        msg
435    }
436
437    async fn get_inner(&self, message_id: &MessageId) -> Option<impl DerefMut<Target = Vertex> + '_> {
438        self.vertices.get_mut(message_id).await
439    }
440
441    /// Get the data of a vertex associated with the given `message_id`.
442    async fn get_with<R>(&self, message_id: &MessageId, f: impl FnOnce(&mut Vertex) -> R) -> Option<R> {
443        let exists = self.pull_message(message_id, true).await;
444
445        self.get_inner(message_id).await.map(|mut v| {
446            if exists {
447                v.allow_eviction();
448            }
449            f(&mut v)
450        })
451    }
452
453    /// Get the data of a vertex associated with the given `message_id`.
454    pub async fn get(&self, message_id: &MessageId) -> Option<MessageRef> {
455        self.get_with(message_id, |v| v.message().cloned()).await.flatten()
456    }
457
458    async fn contains_inner(&self, message_id: &MessageId) -> bool {
459        self.vertices
460            .get(message_id)
461            .await
462            .map_or(false, |v| v.message().is_some())
463    }
464
465    /// Returns whether the message is stored in the Tangle.
466    pub async fn contains(&self, message_id: &MessageId) -> bool {
467        self.contains_inner(message_id).await || self.pull_message(message_id, false).await
468    }
469
470    /// Get the metadata of a vertex associated with the given `message_id`.
471    pub async fn get_metadata(&self, message_id: &MessageId) -> Option<MessageMetadata> {
472        self.get_with(message_id, |v| v.metadata().cloned()).await.flatten()
473    }
474
475    /// Get the metadata of a vertex associated with the given `message_id`.
476    pub async fn get_vertex(&self, message_id: &MessageId) -> Option<impl Deref<Target = Vertex> + '_> {
477        let exists = self.pull_message(message_id, true).await;
478
479        self.get_inner(message_id).await.map(|mut v| {
480            if exists {
481                v.allow_eviction();
482            }
483            v
484        })
485    }
486
487    /// Updates the metadata of a vertex.
488    pub async fn update_metadata<R, Update>(&self, message_id: &MessageId, update: Update) -> Option<R>
489    where
490        Update: FnOnce(&mut MessageMetadata) -> R,
491    {
492        let exists = self.pull_message(message_id, true).await;
493
494        if let Some(mut vertex) = self.vertices.get_mut(message_id).await {
495            if exists {
496                vertex.allow_eviction();
497            }
498            let r = vertex.metadata_mut().map(update);
499
500            if let Some((msg, meta)) = vertex.message_and_metadata() {
501                let (msg, meta) = ((&**msg).clone(), *meta);
502
503                drop(vertex);
504
505                self.storage_insert(*message_id, msg, meta)
506                    .unwrap_or_else(|e| info!("Failed to update metadata for message {:?}", e));
507            }
508
509            r
510        } else {
511            None
512        }
513    }
514
515    async fn children_inner(&self, message_id: &MessageId) -> Option<impl Deref<Target = Vec<MessageId>> + '_> {
516        struct Wrapper<'a> {
517            children: Vec<MessageId>,
518            phantom: PhantomData<&'a ()>,
519        }
520
521        impl<'a> Deref for Wrapper<'a> {
522            type Target = Vec<MessageId>;
523
524            fn deref(&self) -> &Self::Target {
525                &self.children
526            }
527        }
528
529        let vertex = self
530            .vertices
531            .get(message_id)
532            .await
533            // Skip approver lists that are not exhaustive
534            .filter(|v| v.children_exhaustive());
535
536        let children = match vertex {
537            Some(vertex) => {
538                let children = vertex.children().to_vec();
539                drop(vertex);
540                children
541            }
542            None => {
543                drop(vertex);
544                let to_insert = match self.storage.fetch(message_id) {
545                    Err(e) => {
546                        info!("Failed to update approvers for message message {:?}", e);
547                        Vec::new()
548                    }
549                    Ok(None) => Vec::new(),
550                    Ok(Some(approvers)) => approvers,
551                };
552
553                let mut vertex = self.vertices.get_mut_or_empty(*message_id).await;
554
555                // We've just fetched approvers from the database, so we have all the information available to us now.
556                // Therefore, the approvers list is exhaustive (i.e: it contains all knowledge we have).
557                vertex.set_exhaustive();
558
559                for child in to_insert {
560                    vertex.add_child(child);
561                }
562
563                let children = vertex.children().to_vec();
564                drop(vertex);
565                children
566            }
567        };
568
569        Some(Wrapper {
570            children,
571            phantom: PhantomData,
572        })
573    }
574
575    /// Returns the children of a vertex, if we know about them.
576    pub async fn get_children(&self, message_id: &MessageId) -> Option<Vec<MessageId>> {
577        // Effectively atomic
578        self.children_inner(message_id).await.map(|approvers| approvers.clone())
579    }
580
581    // Attempts to pull the message from the storage, returns true if successful.
582    async fn pull_message(&self, message_id: &MessageId, prevent_eviction: bool) -> bool {
583        let contains_now = if prevent_eviction {
584            self.vertices.get_mut(message_id).await.map_or(false, |mut v| {
585                if v.message().is_some() {
586                    v.prevent_eviction();
587                    true
588                } else {
589                    false
590                }
591            })
592        } else {
593            self.contains_inner(message_id).await
594        };
595
596        // If the tangle already contains the message, do no more work
597        if contains_now {
598            true
599        } else if let Ok(Some((msg, metadata))) = self.storage_get(message_id) {
600            self.insert_inner(*message_id, msg, metadata, prevent_eviction).await;
601
602            true
603        } else {
604            false
605        }
606    }
607
608    async fn perform_eviction(&self) {
609        let max_len = self.max_len.load(Ordering::Relaxed);
610        let max_eviction_retries = self.config.max_eviction_retries();
611
612        if self.vertices.len() > max_len {
613            while self.vertices.len() > ((1.0 - CACHE_THRESHOLD_FACTOR) * max_len as f64) as usize {
614                if self.vertices.pop_random(max_eviction_retries).await.is_none() {
615                    log::warn!(
616                        "could not perform cache eviction after {} attempts",
617                        max_eviction_retries
618                    );
619
620                    break;
621                }
622            }
623        }
624    }
625
626    fn storage_get(&self, id: &MessageId) -> Result<Option<(Message, MessageMetadata)>, B::Error> {
627        let msg = self.storage.fetch(id)?;
628        let meta = self.storage.fetch(id)?;
629
630        Ok(msg.zip(meta))
631    }
632
633    fn storage_insert(&self, id: MessageId, tx: Message, metadata: MessageMetadata) -> Result<(), B::Error> {
634        self.storage.insert(&id, &tx)?;
635        self.storage.insert(&id, &metadata)?;
636
637        Ok(())
638    }
639}