1use std::collections::BTreeMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::time::{Duration, Instant};
6
7use anyhow::Result;
8use async_trait::async_trait;
9use humantime::format_duration;
10use parking_lot::RwLock;
11use rand::Rng;
12use scopeguard::defer;
13use tycho_network::PeerId;
14use tycho_types::models::{ConsensusConfig, GenesisInfo, *};
15use tycho_types::prelude::*;
16
17use crate::mempool::{
18 DebugStateUpdateContext, DumpedAnchor, ExternalMessage, GetAnchorResult, MempoolAdapter,
19 MempoolAnchor, MempoolAnchorId, MempoolEventListener, StateUpdateContext,
20};
21use crate::tracing_targets;
22use crate::types::processed_upto::BlockSeqno;
23
24pub struct MempoolAdapterStubImpl {
25 listener: Arc<dyn MempoolEventListener>,
26 anchors_cache: Arc<RwLock<BTreeMap<MempoolAnchorId, Arc<MempoolAnchor>>>>,
27 sleep_between_anchors: AtomicBool,
28}
29
30impl MempoolAdapterStubImpl {
31 pub fn with_stub_externals(
32 listener: Arc<dyn MempoolEventListener>,
33 now: Option<u64>,
34 ) -> Arc<Self> {
35 Self::with_generator(listener, |a| {
36 tokio::spawn(Self::stub_externals_generator(a, now));
37 Ok(())
38 })
39 .unwrap()
40 }
41
42 pub fn with_externals_from_dir(
43 listener: Arc<dyn MempoolEventListener>,
44 dir_path: impl AsRef<Path>,
45 ) -> Result<Arc<Self>> {
46 Self::with_generator(listener, move |a| {
47 let mut paths = std::fs::read_dir(dir_path)?
48 .map(|res| res.map(|e| e.path()))
49 .collect::<Result<Vec<_>, _>>()?;
50 paths.sort();
51
52 tokio::spawn(Self::file_externals_generator(a, paths));
53 Ok(())
54 })
55 }
56
57 fn with_generator<F>(listener: Arc<dyn MempoolEventListener>, start: F) -> Result<Arc<Self>>
58 where
59 F: FnOnce(Arc<Self>) -> Result<()>,
60 {
61 tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER, "creating mempool adapter");
62
63 let adapter = Self {
64 listener,
65 anchors_cache: Arc::new(RwLock::new(BTreeMap::new())),
66 sleep_between_anchors: AtomicBool::new(true),
67 };
68
69 let adapter = Arc::new(adapter);
70
71 start(adapter.clone())?;
72
73 Ok(adapter)
74 }
75
76 #[allow(clippy::too_many_arguments)]
77 pub fn with_anchors_from_dump(
78 listener: Arc<dyn MempoolEventListener>,
79 now: Option<u64>,
80 dumped_anchors: Vec<DumpedAnchor>,
81 ) -> Result<Arc<Self>> {
82 Self::with_generator(listener.clone(), {
83 move |a| {
84 tokio::spawn(Self::anchors_generator(a, now, dumped_anchors));
85 Ok(())
86 }
87 })
88 }
89
90 #[tracing::instrument(skip_all)]
91 async fn stub_externals_generator(self: Arc<Self>, now: Option<u64>) {
92 tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER, "started");
93 defer! {
94 tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER, "finished");
95 }
96
97 let mut prev_anchor_id = 0;
98 let start_anchor_id = prev_anchor_id + 1;
99 for anchor_id in start_anchor_id.. {
100 if self.sleep_between_anchors.load(Ordering::Acquire) {
101 tokio::time::sleep(make_round_interval() * 4).await;
102 } else {
103 tokio::time::sleep(Duration::from_millis(10)).await;
104 }
105
106 let mut anchor = make_stub_anchor(anchor_id, prev_anchor_id);
107 prev_anchor_id = anchor_id;
108
109 if let Some(now) = now {
110 anchor.chain_time += now;
111 }
112
113 let anchor = Arc::new(anchor);
114
115 self.anchors_cache.write().insert(anchor_id, anchor.clone());
116
117 tracing::debug!(
118 target: tracing_targets::MEMPOOL_ADAPTER,
119 anchor_id = anchor.id,
120 chain_time = anchor.chain_time,
121 externals = anchor.externals.len(),
122 "anchor added to cache",
123 );
124
125 self.listener.on_new_anchor(anchor).await.unwrap();
126 }
127 }
128
129 #[tracing::instrument(skip_all)]
130 async fn anchors_generator(
131 self: Arc<Self>,
132 now: Option<u64>,
133 dumped_anchors: Vec<DumpedAnchor>,
134 ) {
135 tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER, "started");
136 defer! {
137 tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER, "finished");
138 }
139
140 let max_anchor_id = dumped_anchors
141 .iter()
142 .map(|a| a.id)
143 .max()
144 .unwrap_or_default();
145 {
147 let mut cache = self.anchors_cache.write();
148 for anchor in dumped_anchors {
149 tracing::debug!(
150 target: tracing_targets::MEMPOOL_ADAPTER,
151 anchor_id = anchor.id,
152 chain_time = anchor.chain_time,
153 externals = anchor.externals.len(),
154 "anchor added to cache",
155 );
156
157 cache.insert(
158 anchor.id,
159 Arc::new(
160 MempoolAnchor::try_from(anchor).expect("Can not parse anchor from dump"),
161 ),
162 );
163 }
164 }
165
166 let mut prev_anchor_id = max_anchor_id;
167 let mut prev_chain_time = self
168 .anchors_cache
169 .read()
170 .get(&prev_anchor_id)
171 .map(|prev_anchor| prev_anchor.chain_time)
172 .or(now)
173 .unwrap_or_default();
174
175 for anchor_id in max_anchor_id + 1.. {
176 if self.sleep_between_anchors.load(Ordering::Acquire) {
177 tokio::time::sleep(make_round_interval() * 4).await;
178 } else {
179 tokio::time::sleep(Duration::from_millis(10)).await;
180 }
181
182 let anchor = make_empty_anchor(anchor_id, prev_anchor_id, prev_chain_time + 1336);
183
184 prev_anchor_id = anchor_id;
185 prev_chain_time = anchor.chain_time;
186
187 self.anchors_cache.write().insert(anchor_id, anchor.clone());
188
189 tracing::debug!(
190 target: tracing_targets::MEMPOOL_ADAPTER,
191 anchor_id = anchor.id,
192 chain_time = anchor.chain_time,
193 externals = anchor.externals.len(),
194 "anchor added to cache",
195 );
196
197 self.listener.on_new_anchor(anchor).await.unwrap();
198 }
199 }
200
201 #[tracing::instrument(skip_all)]
202 async fn file_externals_generator(self: Arc<Self>, paths: Vec<PathBuf>) {
203 tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER, "started");
204 defer! {
205 tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER, "finished");
206 }
207
208 let mut iter = paths.into_iter();
209
210 let mut last_chain_time = 0;
211 let mut prev_anchor_id = 0;
212 for anchor_id in 1.. {
213 if self.sleep_between_anchors.load(Ordering::Acquire) {
214 tokio::time::sleep(make_round_interval() * 4).await;
215 } else {
216 tokio::time::sleep(Duration::from_millis(10)).await;
217 }
218
219 let anchor = 'anchor: {
220 if let Some(path) = iter.next() {
221 match make_anchor_from_file(anchor_id, prev_anchor_id, &path) {
222 Ok(anchor) => {
223 prev_anchor_id = anchor_id;
224 break 'anchor anchor;
225 }
226 Err(e) => {
227 tracing::error!(
228 target: tracing_targets::MEMPOOL_ADAPTER,
229 anchor_id,
230 prev_anchor_id,
231 path = %path.display(),
232 "failed to make anchor from file: {e:?}"
233 );
234 }
235 }
236 }
237
238 make_empty_anchor(anchor_id, prev_anchor_id, last_chain_time + 1336)
239 };
240
241 last_chain_time = anchor.chain_time;
242 self.anchors_cache.write().insert(anchor_id, anchor.clone());
243
244 tracing::debug!(
245 target: tracing_targets::MEMPOOL_ADAPTER,
246 anchor_id = anchor.id,
247 chain_time = anchor.chain_time,
248 externals = anchor.externals.len(),
249 "anchor added to cache",
250 );
251
252 self.listener.on_new_anchor(anchor).await.unwrap();
253 }
254 }
255}
256
257#[async_trait]
258impl MempoolAdapter for MempoolAdapterStubImpl {
259 async fn handle_mc_state_update(&self, cx: Box<StateUpdateContext>) -> Result<()> {
260 tracing::info!(
261 target: tracing_targets::MEMPOOL_ADAPTER,
262 "STUB: Processing state update from mc block {}: {:?}",
263 cx.mc_block_id.as_short_id(), DebugStateUpdateContext(&cx),
264 );
265 Ok(())
266 }
267
268 async fn handle_signed_mc_block(&self, _mc_block_seqno: BlockSeqno) -> Result<()> {
269 Ok(())
270 }
271
272 async fn get_anchor_by_id(&self, anchor_id: MempoolAnchorId) -> Result<GetAnchorResult> {
273 let mut last_attempt_at = None;
274 loop {
275 let Some(anchor) = self.anchors_cache.read().get(&anchor_id).cloned() else {
276 let last_anchor_id = self
277 .anchors_cache
278 .read()
279 .last_key_value()
280 .map_or(0, |(_, last_anchor)| last_anchor.id);
281 if last_anchor_id > anchor_id {
282 return Ok(GetAnchorResult::NotExist);
283 } else {
284 let delta = anchor_id.saturating_sub(last_anchor_id);
285 if delta > 20 {
286 self.sleep_between_anchors.store(false, Ordering::Release);
287 tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER,
288 "sleep_between_anchors set to False because anchor_id {} ahead last {} on {} > 20",
289 anchor_id, last_anchor_id, delta,
290 );
291 tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER,
292 "STUB: mempool return None because requested anchor_id {} ahead last {} on {} > 20",
293 anchor_id, last_anchor_id, delta,
294 );
295 return Ok(GetAnchorResult::NotExist);
296 } else if delta > 3 {
297 self.sleep_between_anchors.store(false, Ordering::Release);
298 tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER,
299 "sleep_between_anchors set to False because anchor_id {} ahead last {} on {} > 3",
300 anchor_id, last_anchor_id, delta,
301 );
302 }
303 }
304
305 if last_attempt_at.is_none() {
306 tracing::debug!(
307 target: tracing_targets::MEMPOOL_ADAPTER,
308 anchor_id,
309 "There is no required anchor in cache. \
310 STUB: Requested it from mempool. Waiting...",
311 );
312 }
313
314 last_attempt_at = Some(Instant::now());
315 tokio::time::sleep(tokio::time::Duration::from_millis(1320)).await;
316 continue;
317 };
318
319 if !self.sleep_between_anchors.fetch_or(true, Ordering::AcqRel) {
320 tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER,
321 "sleep_between_anchors set to True when requested was returned by anchor_id {}",
322 anchor_id,
323 );
324 }
325
326 match last_attempt_at {
327 Some(last) => {
328 tracing::debug!(
329 target: tracing_targets::MEMPOOL_ADAPTER,
330 anchor_id = anchor.id,
331 elapsed = %format_duration(last.elapsed()),
332 "STUB: Returned the anchor from mempool",
333 );
334 }
335 None => {
336 tracing::debug!(
337 target: tracing_targets::MEMPOOL_ADAPTER,
338 anchor_id = anchor.id,
339 "Requested the anchor from the local cache",
340 );
341 }
342 }
343
344 return Ok(GetAnchorResult::Exist(anchor));
345 }
346 }
347
348 async fn get_next_anchor(&self, prev_anchor_id: MempoolAnchorId) -> Result<GetAnchorResult> {
349 let range = (
350 std::ops::Bound::Excluded(prev_anchor_id),
351 std::ops::Bound::Unbounded,
352 );
353
354 let mut last_attempt_at = None;
355 loop {
356 let res = self
357 .anchors_cache
358 .read()
359 .range(range)
360 .next()
361 .map(|(_, v)| v.clone());
362
363 let Some(anchor) = res else {
364 let last_anchor_id = self
365 .anchors_cache
366 .read()
367 .last_key_value()
368 .map_or(0, |(_, last_anchor)| last_anchor.id);
369 let delta = prev_anchor_id.saturating_sub(last_anchor_id);
370 if delta >= 20 {
371 tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER,
372 "sleep_between_anchors set to False because prev_anchor_id {} ahead last {} on {} >= 20",
373 prev_anchor_id, last_anchor_id, delta,
374 );
375 tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER,
376 "STUB: mempool return None because prev_anchor_id {} ahead last {} on {} >= 20",
377 prev_anchor_id, last_anchor_id, delta,
378 );
379 return Ok(GetAnchorResult::NotExist);
380 } else if delta >= 3 {
381 self.sleep_between_anchors.store(false, Ordering::Release);
382 tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER,
383 "sleep_between_anchors set to False because prev_anchor_id {} ahead last {} on {} >= 3",
384 prev_anchor_id, last_anchor_id, delta,
385 );
386 }
387
388 if last_attempt_at.is_none() {
389 tracing::debug!(
390 target: tracing_targets::MEMPOOL_ADAPTER,
391 prev_anchor_id,
392 "There is no next anchor in cache. \
393 STUB: Requested it from mempool. Waiting...",
394 );
395 }
396
397 last_attempt_at = Some(Instant::now());
398 tokio::time::sleep(tokio::time::Duration::from_millis(1320)).await;
399 continue;
400 };
401
402 if !self.sleep_between_anchors.fetch_or(true, Ordering::AcqRel) {
403 tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER,
404 "sleep_between_anchors set to True when next was returned after prev_anchor_id {}",
405 prev_anchor_id,
406 );
407 }
408
409 match last_attempt_at {
410 Some(last) => {
411 tracing::debug!(
412 target: tracing_targets::MEMPOOL_ADAPTER,
413 prev_anchor_id,
414 anchor_id = anchor.id,
415 elapsed = %format_duration(last.elapsed()),
416 "STUB: Returned the next anchor from mempool",
417 );
418 }
419 None => {
420 tracing::debug!(
421 target: tracing_targets::MEMPOOL_ADAPTER,
422 prev_anchor_id,
423 anchor_id = anchor.id,
424 "Requested the next anchor from the local cache",
425 );
426 }
427 }
428
429 return Ok(GetAnchorResult::Exist(anchor));
430 }
431 }
432
433 fn clear_anchors_cache(&self, before_anchor_id: MempoolAnchorId) -> Result<()> {
434 let mut anchors_cache = self.anchors_cache.write();
435 anchors_cache.retain(|anchor_id, _| anchor_id >= &before_anchor_id);
436 Ok(())
437 }
438
439 fn accept_external(&self, _message: bytes::Bytes) {
440 panic!("accept_external not implemented for stub");
441 }
442
443 async fn update_delayed_config(
444 &self,
445 _consensus_config: Option<&ConsensusConfig>,
446 _genesis_info: &GenesisInfo,
447 ) -> Result<()> {
448 panic!("update_delayed_config not implemented for stub");
449 }
450}
451
452pub(crate) fn make_empty_anchor(
453 id: MempoolAnchorId,
454 prev_id: MempoolAnchorId,
455 chain_time: u64,
456) -> Arc<MempoolAnchor> {
457 Arc::new(MempoolAnchor {
458 id,
459 prev_id: Some(prev_id),
460 author: PeerId(Default::default()),
461 chain_time,
462 externals: vec![],
463 })
464}
465
466pub(crate) fn make_stub_anchor(id: MempoolAnchorId, prev_id: MempoolAnchorId) -> MempoolAnchor {
467 let chain_time = id as u64 * 1736 % 1000000000;
468
469 let externals_count = (chain_time % 10) as u32;
470
471 let mut externals = vec![];
472 for i in 0..externals_count {
473 let addr_hash_base = i % 6 + 1;
474 let dst = IntAddr::Std(StdAddr::new(
475 if i > 0 && i % 3 == 0 { -1 } else { 0 },
476 HashBytes([addr_hash_base.try_into().unwrap(); 32]),
477 ));
478 externals.push(Arc::new(make_stub_external(id, chain_time, i, dst)));
479 }
480
481 MempoolAnchor {
482 id,
483 prev_id: Some(prev_id),
484 author: PeerId(Default::default()),
485 chain_time,
486 externals,
487 }
488}
489
490pub(crate) fn make_stub_external(
491 anchor_id: MempoolAnchorId,
492 chain_time: u64,
493 msg_idx: u32,
494 dst: IntAddr,
495) -> ExternalMessage {
496 let body = {
497 let mut builder = CellBuilder::new();
498 builder.store_u32(anchor_id).unwrap();
499 builder.store_u64(chain_time).unwrap();
500 builder.store_u32(msg_idx).unwrap();
501 builder.build().unwrap()
502 };
503
504 let info = ExtInMsgInfo {
505 dst,
506 ..Default::default()
507 };
508
509 let cell = CellBuilder::build_from(Message {
510 info: MsgInfo::ExtIn(info.clone()),
511 init: None,
512 body: body.as_slice().unwrap(),
513 layout: None,
514 })
515 .unwrap();
516
517 ExternalMessage { cell, info }
518}
519
520pub(crate) fn make_anchor_from_file(
521 id: MempoolAnchorId,
522 prev_id: MempoolAnchorId,
523 path: &Path,
524) -> Result<Arc<MempoolAnchor>> {
525 let data = std::fs::read_to_string(path)?;
526
527 let file_name = path.file_name().unwrap().to_str().unwrap();
528 tracing::debug!(
529 target: tracing_targets::MEMPOOL_ADAPTER,
530 file_name,
531 "read external from file"
532 );
533
534 let chain_time = file_name.parse().unwrap();
535
536 let cell = Boc::decode_base64(data)?;
537 let message: Message<'_> = cell.parse()?;
538
539 let mut externals = vec![];
540 if let MsgInfo::ExtIn(info) = message.info {
541 externals.push(Arc::new(ExternalMessage { cell, info }));
542 }
543
544 Ok(Arc::new(MempoolAnchor {
545 id,
546 prev_id: Some(prev_id),
547 author: PeerId(Default::default()),
548 chain_time,
549 externals,
550 }))
551}
552
553fn make_round_interval() -> Duration {
554 Duration::from_millis(rand::rng().random_range(240..340))
555}
556
557#[cfg(test)]
558mod tests {
559 use super::*;
560
561 struct MempoolEventStubListener;
562 #[async_trait]
563 impl MempoolEventListener for MempoolEventStubListener {
564 async fn on_new_anchor(&self, anchor: Arc<MempoolAnchor>) -> Result<()> {
565 tracing::trace!(
566 "MempoolEventStubListener: on_new_anchor event emitted for anchor \
567 (id: {}, chain_time: {}, externals: {})",
568 anchor.id,
569 anchor.chain_time,
570 anchor.externals.len(),
571 );
572 Ok(())
573 }
574 }
575
576 #[tokio::test]
577 async fn test_stub_anchors_generator() -> Result<()> {
578 tycho_util::test::init_logger("test_stub_anchors_generator", "trace");
579
580 let adapter =
581 MempoolAdapterStubImpl::with_stub_externals(Arc::new(MempoolEventStubListener), None);
582
583 let result = adapter.get_anchor_by_id(3).await?;
585 assert!(result.anchor().is_some());
586 assert_eq!(result.anchor().unwrap().id, 3);
587
588 let result = adapter.get_next_anchor(3).await?;
590 assert!(result.anchor().is_some());
591 assert_eq!(result.anchor().unwrap().id, 4);
592
593 let result = adapter.get_next_anchor(5).await?;
595 assert!(result.anchor().is_some());
596 assert_eq!(result.anchor().unwrap().id, 6);
597
598 adapter.clear_anchors_cache(6)?;
600 let result = adapter.get_anchor_by_id(3).await?;
601 assert!(result.anchor().is_none());
602 let result = adapter.get_anchor_by_id(4).await?;
603 assert!(result.anchor().is_none());
604 let result = adapter.get_anchor_by_id(6).await?;
605 assert!(result.anchor().is_some());
606 assert_eq!(result.anchor().unwrap().id, 6);
607
608 Ok(())
609 }
610}