1use crate::{
5 config::TangleConfig,
6 metadata::{IndexId, MessageMetadata},
7 solid_entry_point::SolidEntryPoint,
8 storage::StorageBackend,
9 urts::UrtsTipPool,
10 vertex::Vertex,
11 vertices::Vertices,
12 MessageRef,
13};
14
15use bee_message::{
16 milestone::{Milestone, MilestoneIndex},
17 Message, MessageId,
18};
19use bee_runtime::resource::ResourceHandle;
20
21use hashbrown::HashMap;
22use log::info;
23use ref_cast::RefCast;
24use tokio::sync::Mutex;
25
26use std::{
27 marker::PhantomData,
28 ops::{Deref, DerefMut},
29 sync::atomic::{AtomicU32, AtomicUsize, Ordering},
30};
31
32const DEFAULT_CACHE_LEN: usize = 100_000;
33const CACHE_THRESHOLD_FACTOR: f64 = 0.1;
34const SYNCED_THRESHOLD: u32 = 2;
35const CONFIRMED_THRESHOLD: u32 = 2;
36
37pub struct Tangle<B> {
39 config: TangleConfig,
40 vertices: Vertices,
41 max_len: AtomicUsize,
42 storage: ResourceHandle<B>,
43 milestones: Mutex<HashMap<MilestoneIndex, Milestone>>,
44 solid_entry_points: Mutex<HashMap<SolidEntryPoint, MilestoneIndex>>,
45 latest_milestone_index: AtomicU32,
46 solid_milestone_index: AtomicU32,
47 confirmed_milestone_index: AtomicU32,
48 snapshot_index: AtomicU32,
49 pruning_index: AtomicU32,
50 entry_point_index: AtomicU32,
51 tip_pool: Mutex<UrtsTipPool>,
52}
53
54impl<B: StorageBackend> Tangle<B> {
55 pub fn new(config: TangleConfig, storage: ResourceHandle<B>) -> Self {
57 Self {
58 vertices: Vertices::new(config.num_partitions()),
59 max_len: AtomicUsize::new(DEFAULT_CACHE_LEN),
60 storage,
61 milestones: Default::default(),
62 solid_entry_points: Default::default(),
63 latest_milestone_index: Default::default(),
64 solid_milestone_index: Default::default(),
65 confirmed_milestone_index: Default::default(),
66 snapshot_index: Default::default(),
67 pruning_index: Default::default(),
68 entry_point_index: Default::default(),
69 tip_pool: Mutex::new(UrtsTipPool::new(&config)),
70 config,
71 }
72 }
73
74 pub async fn shutdown(self) {
76 }
78
79 pub fn config(&self) -> &TangleConfig {
81 &self.config
82 }
83
84 pub async fn insert(
86 &self,
87 message: Message,
88 message_id: MessageId,
89 metadata: MessageMetadata,
90 ) -> Option<MessageRef> {
91 let exists = self.pull_message(&message_id, true).await;
92
93 let msg = self.insert_inner(message_id, message.clone(), metadata, !exists).await;
94
95 self.vertices
96 .get_mut(&message_id)
97 .await
98 .expect("Just-inserted message is missing")
99 .allow_eviction();
100
101 if msg.is_some() {
102 for &parent in message.parents().iter() {
104 self.storage
105 .insert(&(parent, message_id), &())
106 .unwrap_or_else(|e| info!("Failed to update approvers for message {:?}", e));
107 }
108
109 self.storage_insert(message_id, message, metadata)
111 .unwrap_or_else(|e| info!("Failed to insert message {:?}", e));
112 }
113
114 msg
115 }
116
117 pub async fn add_milestone(&self, idx: MilestoneIndex, milestone: Milestone) {
119 self.update_metadata(milestone.message_id(), |metadata| {
121 metadata.flags_mut().set_milestone(true);
122 metadata.set_milestone_index(idx);
123 metadata.set_omrsi(IndexId::new(idx, *milestone.message_id()));
124 metadata.set_ymrsi(IndexId::new(idx, *milestone.message_id()));
125 })
126 .await;
127 self.storage()
128 .insert(&idx, &milestone)
129 .unwrap_or_else(|e| info!("Failed to insert message {:?}", e));
130 self.milestones.lock().await.insert(idx, milestone);
131 }
132
133 pub async fn remove_milestone(&self, index: MilestoneIndex) {
135 self.milestones.lock().await.remove(&index);
136 }
137
138 async fn pull_milestone(&self, idx: MilestoneIndex) -> Option<MessageId> {
139 if let Some(milestone) = self.storage().fetch(&idx).unwrap_or_else(|e| {
140 info!("Failed to insert message {:?}", e);
141 None
142 }) {
143 let message_id = *self
144 .milestones
145 .lock()
146 .await
147 .entry(idx)
148 .or_insert(milestone)
149 .message_id();
150
151 Some(message_id)
152 } else {
153 None
154 }
155 }
156
157 pub async fn get_milestone(&self, index: MilestoneIndex) -> Option<Milestone> {
159 self.milestones.lock().await.get(&index).cloned()
160 }
161
162 pub async fn get_milestone_message(&self, index: MilestoneIndex) -> Option<MessageRef> {
164 match self.get_milestone_message_id(index).await {
166 None => None,
167 Some(ref hash) => self.get(hash).await,
168 }
169 }
170
171 pub async fn get_milestone_message_id(&self, index: MilestoneIndex) -> Option<MessageId> {
173 let message_id = self.milestones.lock().await.get(&index).map(|m| *m.message_id());
174
175 match message_id {
177 Some(m) => Some(m),
178 None => Some(self.pull_milestone(index).await?),
179 }
180 }
181
182 pub async fn contains_milestone(&self, idx: MilestoneIndex) -> bool {
184 if self.milestones.lock().await.contains_key(&idx) {
186 return true;
187 }
188 self.pull_milestone(idx).await.is_some()
189 }
190
191 pub fn get_latest_milestone_index(&self) -> MilestoneIndex {
193 self.latest_milestone_index.load(Ordering::Relaxed).into()
194 }
195
196 pub fn update_latest_milestone_index(&self, new_index: MilestoneIndex) {
198 self.latest_milestone_index.store(*new_index, Ordering::Relaxed);
200 }
201
202 pub fn get_solid_milestone_index(&self) -> MilestoneIndex {
204 self.solid_milestone_index.load(Ordering::Relaxed).into()
205 }
206
207 pub fn update_solid_milestone_index(&self, new_index: MilestoneIndex) {
209 self.solid_milestone_index.store(*new_index, Ordering::Relaxed);
210
211 let new_len = ((1000.0 + self.get_sync_threshold() as f32 * 500.0) as usize)
213 .min(DEFAULT_CACHE_LEN)
214 .max(8192);
215 self.resize(new_len);
216 }
217
218 pub fn get_confirmed_milestone_index(&self) -> MilestoneIndex {
220 self.confirmed_milestone_index.load(Ordering::Relaxed).into()
221 }
222
223 pub fn update_confirmed_milestone_index(&self, new_index: MilestoneIndex) {
225 self.confirmed_milestone_index.store(*new_index, Ordering::Relaxed);
226 }
227
228 pub fn get_snapshot_index(&self) -> MilestoneIndex {
230 self.snapshot_index.load(Ordering::Relaxed).into()
231 }
232
233 pub fn update_snapshot_index(&self, new_index: MilestoneIndex) {
235 self.snapshot_index.store(*new_index, Ordering::Relaxed);
236 }
237
238 pub fn get_pruning_index(&self) -> MilestoneIndex {
240 self.pruning_index.load(Ordering::Relaxed).into()
241 }
242
243 pub fn update_pruning_index(&self, new_index: MilestoneIndex) {
245 self.pruning_index.store(*new_index, Ordering::Relaxed);
246 }
247
248 pub fn get_entry_point_index(&self) -> MilestoneIndex {
250 self.entry_point_index.load(Ordering::Relaxed).into()
251 }
252
253 pub fn update_entry_point_index(&self, new_index: MilestoneIndex) {
255 self.entry_point_index.store(*new_index, Ordering::Relaxed);
256 }
257
258 pub fn is_synced(&self) -> bool {
260 self.is_synced_threshold(SYNCED_THRESHOLD)
262 }
263
264 pub fn get_sync_threshold(&self) -> u32 {
266 self.get_latest_milestone_index()
268 .saturating_sub(*self.get_solid_milestone_index())
269 }
270
271 pub fn is_synced_threshold(&self, threshold: u32) -> bool {
273 *self.get_solid_milestone_index() >= self.get_latest_milestone_index().saturating_sub(threshold)
275 }
276
277 pub fn is_confirmed(&self) -> bool {
279 self.is_confirmed_threshold(CONFIRMED_THRESHOLD)
281 }
282
283 pub fn is_confirmed_threshold(&self, threshold: u32) -> bool {
285 *self.get_confirmed_milestone_index() >= self.get_latest_milestone_index().saturating_sub(threshold)
287 }
288
289 pub async fn get_solid_entry_point_index(&self, sep: &SolidEntryPoint) -> Option<MilestoneIndex> {
291 self.solid_entry_points.lock().await.get(sep).copied()
292 }
293
294 pub async fn add_solid_entry_point(&self, sep: SolidEntryPoint, index: MilestoneIndex) {
296 self.solid_entry_points.lock().await.insert(sep, index);
297 }
298
299 pub async fn get_solid_entry_points(&self) -> HashMap<SolidEntryPoint, MilestoneIndex> {
301 self.solid_entry_points.lock().await.clone()
302 }
303
304 pub async fn remove_solid_entry_point(&self, sep: &SolidEntryPoint) {
306 self.solid_entry_points.lock().await.remove(sep);
307 }
308
309 pub async fn clear_solid_entry_points(&self) {
311 self.solid_entry_points.lock().await.clear();
312 }
313
314 pub async fn replace_solid_entry_points(
316 &self,
317 new_seps: impl IntoIterator<Item = (SolidEntryPoint, MilestoneIndex)>,
318 ) {
319 let mut seps = self.solid_entry_points.lock().await;
320 seps.clear();
321 seps.extend(new_seps);
322 }
323
324 pub async fn is_solid_entry_point(&self, id: &MessageId) -> bool {
326 self.solid_entry_points
327 .lock()
328 .await
329 .contains_key(SolidEntryPoint::ref_cast(id))
330 }
331
332 pub async fn is_solid_message(&self, id: &MessageId) -> bool {
334 if self.is_solid_entry_point(id).await {
335 true
336 } else {
337 self.get_metadata(id)
338 .await
339 .map(|metadata| metadata.flags().is_solid())
340 .unwrap_or(false)
341 }
342 }
343
344 pub async fn omrsi(&self, id: &MessageId) -> Option<IndexId> {
346 match self.solid_entry_points.lock().await.get(SolidEntryPoint::ref_cast(id)) {
347 Some(sep) => Some(IndexId::new(*sep, *id)),
348 None => match self.get_metadata(id).await {
349 Some(metadata) => metadata.omrsi(),
350 None => None,
351 },
352 }
353 }
354
355 pub async fn ymrsi(&self, id: &MessageId) -> Option<IndexId> {
357 match self.solid_entry_points.lock().await.get(SolidEntryPoint::ref_cast(id)) {
358 Some(sep) => Some(IndexId::new(*sep, *id)),
359 None => match self.get_metadata(id).await {
360 Some(metadata) => metadata.ymrsi(),
361 None => None,
362 },
363 }
364 }
365
366 pub async fn insert_tip(&self, message_id: MessageId, parents: Vec<MessageId>) {
368 self.tip_pool.lock().await.insert(self, message_id, parents).await;
369 }
370
371 pub async fn update_tip_scores(&self) {
373 self.tip_pool.lock().await.update_scores(self).await;
374 }
375
376 pub async fn get_messages_to_approve(&self) -> Option<Vec<MessageId>> {
378 self.tip_pool.lock().await.choose_non_lazy_tips()
379 }
380
381 pub async fn reduce_tips(&self) {
383 self.tip_pool.lock().await.reduce_tips();
384 }
385
386 pub async fn non_lazy_tips_num(&self) -> usize {
388 self.tip_pool.lock().await.non_lazy_tips().len()
389 }
390
391 fn resize(&self, len: usize) {
393 self.max_len.store(len, Ordering::Relaxed);
394 }
395
396 fn storage(&self) -> &B {
398 &self.storage
399 }
400
401 async fn insert_inner(
402 &self,
403 message_id: MessageId,
404 message: Message,
405 metadata: MessageMetadata,
406 prevent_eviction: bool,
407 ) -> Option<MessageRef> {
408 let mut vertex = self.vertices.get_mut_or_empty(message_id).await;
409
410 if prevent_eviction {
411 vertex.prevent_eviction();
412 }
413
414 let msg = if vertex.message().is_some() {
415 drop(vertex);
416 None
417 } else {
418 let parents = message.parents().clone();
419
420 vertex.insert_message_and_metadata(message, metadata);
421 let msg = vertex.message().cloned();
422 drop(vertex);
423
424 for &parent in parents.iter() {
426 self.vertices.get_mut_or_empty(parent).await.add_child(message_id);
427 }
428
429 msg
430 };
431
432 self.perform_eviction().await;
433
434 msg
435 }
436
437 async fn get_inner(&self, message_id: &MessageId) -> Option<impl DerefMut<Target = Vertex> + '_> {
438 self.vertices.get_mut(message_id).await
439 }
440
441 async fn get_with<R>(&self, message_id: &MessageId, f: impl FnOnce(&mut Vertex) -> R) -> Option<R> {
443 let exists = self.pull_message(message_id, true).await;
444
445 self.get_inner(message_id).await.map(|mut v| {
446 if exists {
447 v.allow_eviction();
448 }
449 f(&mut v)
450 })
451 }
452
453 pub async fn get(&self, message_id: &MessageId) -> Option<MessageRef> {
455 self.get_with(message_id, |v| v.message().cloned()).await.flatten()
456 }
457
458 async fn contains_inner(&self, message_id: &MessageId) -> bool {
459 self.vertices
460 .get(message_id)
461 .await
462 .map_or(false, |v| v.message().is_some())
463 }
464
465 pub async fn contains(&self, message_id: &MessageId) -> bool {
467 self.contains_inner(message_id).await || self.pull_message(message_id, false).await
468 }
469
470 pub async fn get_metadata(&self, message_id: &MessageId) -> Option<MessageMetadata> {
472 self.get_with(message_id, |v| v.metadata().cloned()).await.flatten()
473 }
474
475 pub async fn get_vertex(&self, message_id: &MessageId) -> Option<impl Deref<Target = Vertex> + '_> {
477 let exists = self.pull_message(message_id, true).await;
478
479 self.get_inner(message_id).await.map(|mut v| {
480 if exists {
481 v.allow_eviction();
482 }
483 v
484 })
485 }
486
487 pub async fn update_metadata<R, Update>(&self, message_id: &MessageId, update: Update) -> Option<R>
489 where
490 Update: FnOnce(&mut MessageMetadata) -> R,
491 {
492 let exists = self.pull_message(message_id, true).await;
493
494 if let Some(mut vertex) = self.vertices.get_mut(message_id).await {
495 if exists {
496 vertex.allow_eviction();
497 }
498 let r = vertex.metadata_mut().map(update);
499
500 if let Some((msg, meta)) = vertex.message_and_metadata() {
501 let (msg, meta) = ((&**msg).clone(), *meta);
502
503 drop(vertex);
504
505 self.storage_insert(*message_id, msg, meta)
506 .unwrap_or_else(|e| info!("Failed to update metadata for message {:?}", e));
507 }
508
509 r
510 } else {
511 None
512 }
513 }
514
515 async fn children_inner(&self, message_id: &MessageId) -> Option<impl Deref<Target = Vec<MessageId>> + '_> {
516 struct Wrapper<'a> {
517 children: Vec<MessageId>,
518 phantom: PhantomData<&'a ()>,
519 }
520
521 impl<'a> Deref for Wrapper<'a> {
522 type Target = Vec<MessageId>;
523
524 fn deref(&self) -> &Self::Target {
525 &self.children
526 }
527 }
528
529 let vertex = self
530 .vertices
531 .get(message_id)
532 .await
533 .filter(|v| v.children_exhaustive());
535
536 let children = match vertex {
537 Some(vertex) => {
538 let children = vertex.children().to_vec();
539 drop(vertex);
540 children
541 }
542 None => {
543 drop(vertex);
544 let to_insert = match self.storage.fetch(message_id) {
545 Err(e) => {
546 info!("Failed to update approvers for message message {:?}", e);
547 Vec::new()
548 }
549 Ok(None) => Vec::new(),
550 Ok(Some(approvers)) => approvers,
551 };
552
553 let mut vertex = self.vertices.get_mut_or_empty(*message_id).await;
554
555 vertex.set_exhaustive();
558
559 for child in to_insert {
560 vertex.add_child(child);
561 }
562
563 let children = vertex.children().to_vec();
564 drop(vertex);
565 children
566 }
567 };
568
569 Some(Wrapper {
570 children,
571 phantom: PhantomData,
572 })
573 }
574
575 pub async fn get_children(&self, message_id: &MessageId) -> Option<Vec<MessageId>> {
577 self.children_inner(message_id).await.map(|approvers| approvers.clone())
579 }
580
581 async fn pull_message(&self, message_id: &MessageId, prevent_eviction: bool) -> bool {
583 let contains_now = if prevent_eviction {
584 self.vertices.get_mut(message_id).await.map_or(false, |mut v| {
585 if v.message().is_some() {
586 v.prevent_eviction();
587 true
588 } else {
589 false
590 }
591 })
592 } else {
593 self.contains_inner(message_id).await
594 };
595
596 if contains_now {
598 true
599 } else if let Ok(Some((msg, metadata))) = self.storage_get(message_id) {
600 self.insert_inner(*message_id, msg, metadata, prevent_eviction).await;
601
602 true
603 } else {
604 false
605 }
606 }
607
608 async fn perform_eviction(&self) {
609 let max_len = self.max_len.load(Ordering::Relaxed);
610 let max_eviction_retries = self.config.max_eviction_retries();
611
612 if self.vertices.len() > max_len {
613 while self.vertices.len() > ((1.0 - CACHE_THRESHOLD_FACTOR) * max_len as f64) as usize {
614 if self.vertices.pop_random(max_eviction_retries).await.is_none() {
615 log::warn!(
616 "could not perform cache eviction after {} attempts",
617 max_eviction_retries
618 );
619
620 break;
621 }
622 }
623 }
624 }
625
626 fn storage_get(&self, id: &MessageId) -> Result<Option<(Message, MessageMetadata)>, B::Error> {
627 let msg = self.storage.fetch(id)?;
628 let meta = self.storage.fetch(id)?;
629
630 Ok(msg.zip(meta))
631 }
632
633 fn storage_insert(&self, id: MessageId, tx: Message, metadata: MessageMetadata) -> Result<(), B::Error> {
634 self.storage.insert(&id, &tx)?;
635 self.storage.insert(&id, &metadata)?;
636
637 Ok(())
638 }
639}