Skip to main content

tycho_consensus/storage/
adapter_store.rs

1use std::collections::BTreeMap;
2use std::num::NonZeroU32;
3use std::ops::RangeInclusive;
4use std::sync::Arc;
5
6use ahash::{HashMapExt, HashSetExt};
7use anyhow::{Context, Result};
8use bumpalo::Bump;
9use itertools::Itertools;
10use tycho_util::metrics::HistogramGuard;
11use tycho_util::{FastHashMap, FastHashSet};
12use weedb::rocksdb::{ReadOptions, WriteBatch};
13
14use super::{AnchorFlags, MempoolStore};
15use crate::effects::AltFormat;
16use crate::models::{
17    CommitHistoryPart, Point, PointInfo, PointKey, PointRestore, PointRestoreSelect, PointStatus,
18    PointStatusValidated, Round,
19};
20use crate::storage::MempoolDb;
21
22#[derive(Clone)]
23pub struct MempoolAdapterStore(Arc<MempoolDb>);
24
25impl MempoolAdapterStore {
26    pub fn new(mempool_db: Arc<MempoolDb>) -> Self {
27        Self(mempool_db)
28    }
29
30    /// Next must call [`Self::set_committed`] for GC as watch notification is deferred
31    pub fn expand_anchor_history<'b>(
32        &self,
33        anchor: &PointInfo,
34        history: &[PointInfo],
35        bump: &'b Bump,
36        set_committed_in_db: bool,
37    ) -> Vec<&'b [u8]> {
38        fn context(anchor: &PointInfo, history: &[PointInfo]) -> String {
39            format!(
40                "anchor {:?} history {} points rounds [{}..{}]",
41                anchor.id().alt(),
42                history.len(),
43                history.first().map(|i| i.round().0).unwrap_or_default(),
44                history.last().map(|i| i.round().0).unwrap_or_default()
45            )
46        }
47
48        let payloads = if history.is_empty() {
49            // history is checked at the end of DAG commit, leave traces in case its broken
50            tracing::warn!(
51                "anchor {:?} has empty history, it's ok only for anchor at DAG bottom round \
52                 immediately after an unrecoverable gap",
53                anchor.id().alt()
54            );
55            Vec::new()
56        } else {
57            self.load_payload(history, bump)
58                .with_context(|| context(anchor, history))
59                .expect("DB expand anchor history")
60        };
61        if set_committed_in_db {
62            self.set_committed_db(anchor, history)
63                .with_context(|| context(anchor, history))
64                .expect("DB set committed");
65        }
66        payloads
67    }
68
69    pub fn expand_anchor_history_arena_size(&self, history: &[PointInfo]) -> usize {
70        let payload_bytes =
71            (history.iter()).fold(0, |acc, info| acc + info.payload_bytes() as usize);
72        let keys_bytes = history.len() * PointKey::MAX_TL_BYTES;
73        payload_bytes + keys_bytes
74    }
75
76    fn load_payload<'b>(&self, history: &[PointInfo], bump: &'b Bump) -> Result<Vec<&'b [u8]>> {
77        let _call_duration =
78            HistogramGuard::begin("tycho_mempool_store_expand_anchor_history_time");
79        let mut key_buf = [0; _];
80        let mut keys = FastHashSet::<&'b [u8]>::with_capacity(history.len());
81        for info in history {
82            info.key().fill(&mut key_buf);
83            keys.insert(bump.alloc_slice_copy(&key_buf));
84        }
85
86        let mut opt = ReadOptions::default();
87
88        let first = (history.first()).context("anchor history must not be empty")?;
89        PointKey::fill_prefix(first.round(), &mut key_buf);
90        opt.set_iterate_lower_bound(key_buf);
91
92        let last = history.last().context("anchor history must not be empty")?;
93        PointKey::fill_prefix(last.round().next(), &mut key_buf);
94        opt.set_iterate_upper_bound(key_buf);
95
96        let db = self.0.db.rocksdb();
97        let points_cf = self.0.db.points.cf();
98
99        let mut found = FastHashMap::with_capacity(history.len());
100        let mut iter = db.raw_iterator_cf_opt(&points_cf, opt);
101        iter.seek_to_first();
102
103        let mut total_payload_items = 0;
104        while iter.valid() {
105            let key = iter.key().context("history iter invalidated on key")?;
106            if let Some(key) = keys.take(key) {
107                let bytes = iter.value().context("history iter invalidated on value")?;
108                let payload =
109                    Point::read_payload_from_tl_bytes(bytes, bump).context("deserialize point")?;
110
111                total_payload_items += payload.len();
112                if found.insert(key, payload).is_some() {
113                    // we panic thus we don't care about performance
114                    let full_point =
115                        Point::from_bytes(bytes.to_vec()).context("deserialize point")?;
116                    panic!("iter read non-unique point {:?}", full_point.info().id())
117                }
118            }
119            if keys.is_empty() {
120                break;
121            }
122            iter.next();
123        }
124        iter.status().context("anchor history iter is not ok")?;
125        drop(iter);
126
127        anyhow::ensure!(
128            keys.is_empty(),
129            "{} history points were not found id db:\n{}",
130            keys.len(),
131            keys.into_iter().map(PointKey::format_loose).join(",\n")
132        );
133        anyhow::ensure!(found.len() == history.len(), "stored point key collision");
134
135        let mut result = Vec::with_capacity(total_payload_items);
136        for info in history {
137            info.key().fill(&mut key_buf);
138            let payload = found
139                .remove(&key_buf[..])
140                .with_context(|| PointKey::format_loose(&key_buf))
141                .context("key was searched in db but was not found")?;
142            for msg in payload {
143                result.push(msg);
144            }
145        }
146
147        Ok(result)
148    }
149
150    fn set_committed_db(&self, anchor: &PointInfo, history: &[PointInfo]) -> Result<()> {
151        let _call_duration = HistogramGuard::begin("tycho_mempool_store_set_committed_status_time");
152
153        let anchor_round =
154            NonZeroU32::try_from(anchor.round().0).context("zero round cannot have points")?;
155
156        let mut key_buf = [0; _];
157
158        let db = self.0.db.rocksdb();
159        let status_cf = self.0.db.points_status.cf();
160        let mut batch = WriteBatch::with_capacity_bytes(
161            PointStatusValidated::size_hint() * (1 + history.len()),
162        );
163
164        let mut status = PointStatusValidated::default();
165        let mut status_encoded = Vec::with_capacity(PointStatusValidated::size_hint());
166
167        status.anchor_flags = AnchorFlags::Used;
168        status.write_to(&mut status_encoded);
169
170        anchor.key().fill(&mut key_buf);
171        batch.merge_cf(&status_cf, &key_buf[..], &status_encoded);
172        status_encoded.clear();
173
174        status = PointStatusValidated::default();
175        for (index, info) in history.iter().enumerate() {
176            status.committed = Some(CommitHistoryPart {
177                anchor_round,
178                seq_no: u32::try_from(index).context("anchor has insanely long history")?,
179            });
180
181            status.write_to(&mut status_encoded);
182
183            info.key().fill(&mut key_buf);
184            batch.merge_cf(&status_cf, &key_buf[..], &status_encoded);
185            status_encoded.clear();
186        }
187
188        Ok(db.write(batch)?)
189    }
190
191    pub fn load_history_since(
192        &self,
193        bottom_round: u32,
194    ) -> BTreeMap<u32, (PointInfo, Vec<PointInfo>)> {
195        let store = MempoolStore::new(self.0.clone());
196
197        let Some(last_db_round) = store.last_round() else {
198            tracing::warn!("Mempool db is empty");
199            return Default::default();
200        };
201
202        let mut anchors = FastHashMap::new();
203
204        let mut items = store
205            .load_restore(&RangeInclusive::new(Round(bottom_round), last_db_round))
206            .into_iter()
207            .filter_map(|item| match item {
208                PointRestoreSelect::Ready(PointRestore::Validated(info, status)) => {
209                    Some((info, status))
210                }
211                _ => None,
212            })
213            .inspect(|(info, status)| {
214                if status.anchor_flags.contains(AnchorFlags::Used) {
215                    anchors.insert(info.round(), info.clone());
216                }
217            })
218            .filter_map(|(info, status)| status.committed.map(|committed| (committed, info)))
219            .collect::<Vec<_>>();
220
221        items.sort_unstable_by_key(|(committed, _)| (committed.anchor_round, committed.seq_no));
222
223        let mut by_anchor_round = BTreeMap::new();
224
225        // should not allocate as all items are sorted
226        let grouped = items
227            .into_iter()
228            .chunk_by(|(committed, _)| Round(committed.anchor_round.get()));
229
230        for (anchor_round, group) in &grouped {
231            let mut keyed_vec = group.collect::<Vec<_>>();
232            // should be a no-op
233            keyed_vec.sort_unstable_by_key(|(committed, _)| committed.seq_no);
234            let point_vec = keyed_vec.into_iter().map(|(_, info)| info);
235            match anchors.remove(&anchor_round) {
236                Some(anchor) => {
237                    by_anchor_round.insert(anchor_round.0, (anchor.clone(), point_vec.collect()));
238                }
239                None => {
240                    tracing::error!(
241                        anchor = anchor_round.0,
242                        "cannot reproduce history: no anchor point"
243                    );
244                }
245            }
246        }
247        by_anchor_round
248    }
249}