tycho_consensus/storage/
adapter_store.rs1use std::collections::BTreeMap;
2use std::num::NonZeroU32;
3use std::ops::RangeInclusive;
4use std::sync::Arc;
5
6use ahash::{HashMapExt, HashSetExt};
7use anyhow::{Context, Result};
8use bumpalo::Bump;
9use itertools::Itertools;
10use tycho_util::metrics::HistogramGuard;
11use tycho_util::{FastHashMap, FastHashSet};
12use weedb::rocksdb::{ReadOptions, WriteBatch};
13
14use super::{AnchorFlags, MempoolStore};
15use crate::effects::AltFormat;
16use crate::models::{
17 CommitHistoryPart, Point, PointInfo, PointKey, PointRestore, PointRestoreSelect, PointStatus,
18 PointStatusValidated, Round,
19};
20use crate::storage::MempoolDb;
21
22#[derive(Clone)]
23pub struct MempoolAdapterStore(Arc<MempoolDb>);
24
25impl MempoolAdapterStore {
26 pub fn new(mempool_db: Arc<MempoolDb>) -> Self {
27 Self(mempool_db)
28 }
29
30 pub fn expand_anchor_history<'b>(
32 &self,
33 anchor: &PointInfo,
34 history: &[PointInfo],
35 bump: &'b Bump,
36 set_committed_in_db: bool,
37 ) -> Vec<&'b [u8]> {
38 fn context(anchor: &PointInfo, history: &[PointInfo]) -> String {
39 format!(
40 "anchor {:?} history {} points rounds [{}..{}]",
41 anchor.id().alt(),
42 history.len(),
43 history.first().map(|i| i.round().0).unwrap_or_default(),
44 history.last().map(|i| i.round().0).unwrap_or_default()
45 )
46 }
47
48 let payloads = if history.is_empty() {
49 tracing::warn!(
51 "anchor {:?} has empty history, it's ok only for anchor at DAG bottom round \
52 immediately after an unrecoverable gap",
53 anchor.id().alt()
54 );
55 Vec::new()
56 } else {
57 self.load_payload(history, bump)
58 .with_context(|| context(anchor, history))
59 .expect("DB expand anchor history")
60 };
61 if set_committed_in_db {
62 self.set_committed_db(anchor, history)
63 .with_context(|| context(anchor, history))
64 .expect("DB set committed");
65 }
66 payloads
67 }
68
69 pub fn expand_anchor_history_arena_size(&self, history: &[PointInfo]) -> usize {
70 let payload_bytes =
71 (history.iter()).fold(0, |acc, info| acc + info.payload_bytes() as usize);
72 let keys_bytes = history.len() * PointKey::MAX_TL_BYTES;
73 payload_bytes + keys_bytes
74 }
75
76 fn load_payload<'b>(&self, history: &[PointInfo], bump: &'b Bump) -> Result<Vec<&'b [u8]>> {
77 let _call_duration =
78 HistogramGuard::begin("tycho_mempool_store_expand_anchor_history_time");
79 let mut key_buf = [0; _];
80 let mut keys = FastHashSet::<&'b [u8]>::with_capacity(history.len());
81 for info in history {
82 info.key().fill(&mut key_buf);
83 keys.insert(bump.alloc_slice_copy(&key_buf));
84 }
85
86 let mut opt = ReadOptions::default();
87
88 let first = (history.first()).context("anchor history must not be empty")?;
89 PointKey::fill_prefix(first.round(), &mut key_buf);
90 opt.set_iterate_lower_bound(key_buf);
91
92 let last = history.last().context("anchor history must not be empty")?;
93 PointKey::fill_prefix(last.round().next(), &mut key_buf);
94 opt.set_iterate_upper_bound(key_buf);
95
96 let db = self.0.db.rocksdb();
97 let points_cf = self.0.db.points.cf();
98
99 let mut found = FastHashMap::with_capacity(history.len());
100 let mut iter = db.raw_iterator_cf_opt(&points_cf, opt);
101 iter.seek_to_first();
102
103 let mut total_payload_items = 0;
104 while iter.valid() {
105 let key = iter.key().context("history iter invalidated on key")?;
106 if let Some(key) = keys.take(key) {
107 let bytes = iter.value().context("history iter invalidated on value")?;
108 let payload =
109 Point::read_payload_from_tl_bytes(bytes, bump).context("deserialize point")?;
110
111 total_payload_items += payload.len();
112 if found.insert(key, payload).is_some() {
113 let full_point =
115 Point::from_bytes(bytes.to_vec()).context("deserialize point")?;
116 panic!("iter read non-unique point {:?}", full_point.info().id())
117 }
118 }
119 if keys.is_empty() {
120 break;
121 }
122 iter.next();
123 }
124 iter.status().context("anchor history iter is not ok")?;
125 drop(iter);
126
127 anyhow::ensure!(
128 keys.is_empty(),
129 "{} history points were not found id db:\n{}",
130 keys.len(),
131 keys.into_iter().map(PointKey::format_loose).join(",\n")
132 );
133 anyhow::ensure!(found.len() == history.len(), "stored point key collision");
134
135 let mut result = Vec::with_capacity(total_payload_items);
136 for info in history {
137 info.key().fill(&mut key_buf);
138 let payload = found
139 .remove(&key_buf[..])
140 .with_context(|| PointKey::format_loose(&key_buf))
141 .context("key was searched in db but was not found")?;
142 for msg in payload {
143 result.push(msg);
144 }
145 }
146
147 Ok(result)
148 }
149
150 fn set_committed_db(&self, anchor: &PointInfo, history: &[PointInfo]) -> Result<()> {
151 let _call_duration = HistogramGuard::begin("tycho_mempool_store_set_committed_status_time");
152
153 let anchor_round =
154 NonZeroU32::try_from(anchor.round().0).context("zero round cannot have points")?;
155
156 let mut key_buf = [0; _];
157
158 let db = self.0.db.rocksdb();
159 let status_cf = self.0.db.points_status.cf();
160 let mut batch = WriteBatch::with_capacity_bytes(
161 PointStatusValidated::size_hint() * (1 + history.len()),
162 );
163
164 let mut status = PointStatusValidated::default();
165 let mut status_encoded = Vec::with_capacity(PointStatusValidated::size_hint());
166
167 status.anchor_flags = AnchorFlags::Used;
168 status.write_to(&mut status_encoded);
169
170 anchor.key().fill(&mut key_buf);
171 batch.merge_cf(&status_cf, &key_buf[..], &status_encoded);
172 status_encoded.clear();
173
174 status = PointStatusValidated::default();
175 for (index, info) in history.iter().enumerate() {
176 status.committed = Some(CommitHistoryPart {
177 anchor_round,
178 seq_no: u32::try_from(index).context("anchor has insanely long history")?,
179 });
180
181 status.write_to(&mut status_encoded);
182
183 info.key().fill(&mut key_buf);
184 batch.merge_cf(&status_cf, &key_buf[..], &status_encoded);
185 status_encoded.clear();
186 }
187
188 Ok(db.write(batch)?)
189 }
190
191 pub fn load_history_since(
192 &self,
193 bottom_round: u32,
194 ) -> BTreeMap<u32, (PointInfo, Vec<PointInfo>)> {
195 let store = MempoolStore::new(self.0.clone());
196
197 let Some(last_db_round) = store.last_round() else {
198 tracing::warn!("Mempool db is empty");
199 return Default::default();
200 };
201
202 let mut anchors = FastHashMap::new();
203
204 let mut items = store
205 .load_restore(&RangeInclusive::new(Round(bottom_round), last_db_round))
206 .into_iter()
207 .filter_map(|item| match item {
208 PointRestoreSelect::Ready(PointRestore::Validated(info, status)) => {
209 Some((info, status))
210 }
211 _ => None,
212 })
213 .inspect(|(info, status)| {
214 if status.anchor_flags.contains(AnchorFlags::Used) {
215 anchors.insert(info.round(), info.clone());
216 }
217 })
218 .filter_map(|(info, status)| status.committed.map(|committed| (committed, info)))
219 .collect::<Vec<_>>();
220
221 items.sort_unstable_by_key(|(committed, _)| (committed.anchor_round, committed.seq_no));
222
223 let mut by_anchor_round = BTreeMap::new();
224
225 let grouped = items
227 .into_iter()
228 .chunk_by(|(committed, _)| Round(committed.anchor_round.get()));
229
230 for (anchor_round, group) in &grouped {
231 let mut keyed_vec = group.collect::<Vec<_>>();
232 keyed_vec.sort_unstable_by_key(|(committed, _)| committed.seq_no);
234 let point_vec = keyed_vec.into_iter().map(|(_, info)| info);
235 match anchors.remove(&anchor_round) {
236 Some(anchor) => {
237 by_anchor_round.insert(anchor_round.0, (anchor.clone(), point_vec.collect()));
238 }
239 None => {
240 tracing::error!(
241 anchor = anchor_round.0,
242 "cannot reproduce history: no anchor point"
243 );
244 }
245 }
246 }
247 by_anchor_round
248 }
249}