Skip to main content

tycho_collator/mempool/impls/
stub_impl.rs

1use std::collections::BTreeMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::time::{Duration, Instant};
6
7use anyhow::Result;
8use async_trait::async_trait;
9use humantime::format_duration;
10use parking_lot::RwLock;
11use rand::Rng;
12use scopeguard::defer;
13use tycho_network::PeerId;
14use tycho_types::models::{ConsensusConfig, GenesisInfo, *};
15use tycho_types::prelude::*;
16
17use crate::mempool::{
18    DebugStateUpdateContext, DumpedAnchor, ExternalMessage, GetAnchorResult, MempoolAdapter,
19    MempoolAnchor, MempoolAnchorId, MempoolEventListener, StateUpdateContext,
20};
21use crate::tracing_targets;
22use crate::types::processed_upto::BlockSeqno;
23
24pub struct MempoolAdapterStubImpl {
25    listener: Arc<dyn MempoolEventListener>,
26    anchors_cache: Arc<RwLock<BTreeMap<MempoolAnchorId, Arc<MempoolAnchor>>>>,
27    sleep_between_anchors: AtomicBool,
28}
29
30impl MempoolAdapterStubImpl {
31    pub fn with_stub_externals(
32        listener: Arc<dyn MempoolEventListener>,
33        now: Option<u64>,
34    ) -> Arc<Self> {
35        Self::with_generator(listener, |a| {
36            tokio::spawn(Self::stub_externals_generator(a, now));
37            Ok(())
38        })
39        .unwrap()
40    }
41
42    pub fn with_externals_from_dir(
43        listener: Arc<dyn MempoolEventListener>,
44        dir_path: impl AsRef<Path>,
45    ) -> Result<Arc<Self>> {
46        Self::with_generator(listener, move |a| {
47            let mut paths = std::fs::read_dir(dir_path)?
48                .map(|res| res.map(|e| e.path()))
49                .collect::<Result<Vec<_>, _>>()?;
50            paths.sort();
51
52            tokio::spawn(Self::file_externals_generator(a, paths));
53            Ok(())
54        })
55    }
56
57    fn with_generator<F>(listener: Arc<dyn MempoolEventListener>, start: F) -> Result<Arc<Self>>
58    where
59        F: FnOnce(Arc<Self>) -> Result<()>,
60    {
61        tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER, "creating mempool adapter");
62
63        let adapter = Self {
64            listener,
65            anchors_cache: Arc::new(RwLock::new(BTreeMap::new())),
66            sleep_between_anchors: AtomicBool::new(true),
67        };
68
69        let adapter = Arc::new(adapter);
70
71        start(adapter.clone())?;
72
73        Ok(adapter)
74    }
75
76    #[allow(clippy::too_many_arguments)]
77    pub fn with_anchors_from_dump(
78        listener: Arc<dyn MempoolEventListener>,
79        now: Option<u64>,
80        dumped_anchors: Vec<DumpedAnchor>,
81    ) -> Result<Arc<Self>> {
82        Self::with_generator(listener.clone(), {
83            move |a| {
84                tokio::spawn(Self::anchors_generator(a, now, dumped_anchors));
85                Ok(())
86            }
87        })
88    }
89
90    #[tracing::instrument(skip_all)]
91    async fn stub_externals_generator(self: Arc<Self>, now: Option<u64>) {
92        tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER, "started");
93        defer! {
94            tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER, "finished");
95        }
96
97        let mut prev_anchor_id = 0;
98        let start_anchor_id = prev_anchor_id + 1;
99        for anchor_id in start_anchor_id.. {
100            if self.sleep_between_anchors.load(Ordering::Acquire) {
101                tokio::time::sleep(make_round_interval() * 4).await;
102            } else {
103                tokio::time::sleep(Duration::from_millis(10)).await;
104            }
105
106            let mut anchor = make_stub_anchor(anchor_id, prev_anchor_id);
107            prev_anchor_id = anchor_id;
108
109            if let Some(now) = now {
110                anchor.chain_time += now;
111            }
112
113            let anchor = Arc::new(anchor);
114
115            self.anchors_cache.write().insert(anchor_id, anchor.clone());
116
117            tracing::debug!(
118                target: tracing_targets::MEMPOOL_ADAPTER,
119                anchor_id = anchor.id,
120                chain_time = anchor.chain_time,
121                externals = anchor.externals.len(),
122                "anchor added to cache",
123            );
124
125            self.listener.on_new_anchor(anchor).await.unwrap();
126        }
127    }
128
129    #[tracing::instrument(skip_all)]
130    async fn anchors_generator(
131        self: Arc<Self>,
132        now: Option<u64>,
133        dumped_anchors: Vec<DumpedAnchor>,
134    ) {
135        tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER, "started");
136        defer! {
137            tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER, "finished");
138        }
139
140        let max_anchor_id = dumped_anchors
141            .iter()
142            .map(|a| a.id)
143            .max()
144            .unwrap_or_default();
145        // Preload dumped anchors into cache
146        {
147            let mut cache = self.anchors_cache.write();
148            for anchor in dumped_anchors {
149                tracing::debug!(
150                    target: tracing_targets::MEMPOOL_ADAPTER,
151                    anchor_id = anchor.id,
152                    chain_time = anchor.chain_time,
153                    externals = anchor.externals.len(),
154                    "anchor added to cache",
155                );
156
157                cache.insert(
158                    anchor.id,
159                    Arc::new(
160                        MempoolAnchor::try_from(anchor).expect("Can not parse anchor from dump"),
161                    ),
162                );
163            }
164        }
165
166        let mut prev_anchor_id = max_anchor_id;
167        let mut prev_chain_time = self
168            .anchors_cache
169            .read()
170            .get(&prev_anchor_id)
171            .map(|prev_anchor| prev_anchor.chain_time)
172            .or(now)
173            .unwrap_or_default();
174
175        for anchor_id in max_anchor_id + 1.. {
176            if self.sleep_between_anchors.load(Ordering::Acquire) {
177                tokio::time::sleep(make_round_interval() * 4).await;
178            } else {
179                tokio::time::sleep(Duration::from_millis(10)).await;
180            }
181
182            let anchor = make_empty_anchor(anchor_id, prev_anchor_id, prev_chain_time + 1336);
183
184            prev_anchor_id = anchor_id;
185            prev_chain_time = anchor.chain_time;
186
187            self.anchors_cache.write().insert(anchor_id, anchor.clone());
188
189            tracing::debug!(
190                target: tracing_targets::MEMPOOL_ADAPTER,
191                anchor_id = anchor.id,
192                chain_time = anchor.chain_time,
193                externals = anchor.externals.len(),
194                "anchor added to cache",
195            );
196
197            self.listener.on_new_anchor(anchor).await.unwrap();
198        }
199    }
200
201    #[tracing::instrument(skip_all)]
202    async fn file_externals_generator(self: Arc<Self>, paths: Vec<PathBuf>) {
203        tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER, "started");
204        defer! {
205            tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER, "finished");
206        }
207
208        let mut iter = paths.into_iter();
209
210        let mut last_chain_time = 0;
211        let mut prev_anchor_id = 0;
212        for anchor_id in 1.. {
213            if self.sleep_between_anchors.load(Ordering::Acquire) {
214                tokio::time::sleep(make_round_interval() * 4).await;
215            } else {
216                tokio::time::sleep(Duration::from_millis(10)).await;
217            }
218
219            let anchor = 'anchor: {
220                if let Some(path) = iter.next() {
221                    match make_anchor_from_file(anchor_id, prev_anchor_id, &path) {
222                        Ok(anchor) => {
223                            prev_anchor_id = anchor_id;
224                            break 'anchor anchor;
225                        }
226                        Err(e) => {
227                            tracing::error!(
228                                target: tracing_targets::MEMPOOL_ADAPTER,
229                                anchor_id,
230                                prev_anchor_id,
231                                path = %path.display(),
232                                "failed to make anchor from file: {e:?}"
233                            );
234                        }
235                    }
236                }
237
238                make_empty_anchor(anchor_id, prev_anchor_id, last_chain_time + 1336)
239            };
240
241            last_chain_time = anchor.chain_time;
242            self.anchors_cache.write().insert(anchor_id, anchor.clone());
243
244            tracing::debug!(
245                target: tracing_targets::MEMPOOL_ADAPTER,
246                anchor_id = anchor.id,
247                chain_time = anchor.chain_time,
248                externals = anchor.externals.len(),
249                "anchor added to cache",
250            );
251
252            self.listener.on_new_anchor(anchor).await.unwrap();
253        }
254    }
255}
256
257#[async_trait]
258impl MempoolAdapter for MempoolAdapterStubImpl {
259    async fn handle_mc_state_update(&self, cx: Box<StateUpdateContext>) -> Result<()> {
260        tracing::info!(
261            target: tracing_targets::MEMPOOL_ADAPTER,
262            "STUB: Processing state update from mc block {}: {:?}",
263            cx.mc_block_id.as_short_id(), DebugStateUpdateContext(&cx),
264        );
265        Ok(())
266    }
267
268    async fn handle_signed_mc_block(&self, _mc_block_seqno: BlockSeqno) -> Result<()> {
269        Ok(())
270    }
271
272    async fn get_anchor_by_id(&self, anchor_id: MempoolAnchorId) -> Result<GetAnchorResult> {
273        let mut last_attempt_at = None;
274        loop {
275            let Some(anchor) = self.anchors_cache.read().get(&anchor_id).cloned() else {
276                let last_anchor_id = self
277                    .anchors_cache
278                    .read()
279                    .last_key_value()
280                    .map_or(0, |(_, last_anchor)| last_anchor.id);
281                if last_anchor_id > anchor_id {
282                    return Ok(GetAnchorResult::NotExist);
283                } else {
284                    let delta = anchor_id.saturating_sub(last_anchor_id);
285                    if delta > 20 {
286                        self.sleep_between_anchors.store(false, Ordering::Release);
287                        tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER,
288                            "sleep_between_anchors set to False because anchor_id {} ahead last {} on {} > 20",
289                            anchor_id, last_anchor_id, delta,
290                        );
291                        tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER,
292                            "STUB: mempool return None because requested anchor_id {} ahead last {} on {} > 20",
293                            anchor_id, last_anchor_id, delta,
294                        );
295                        return Ok(GetAnchorResult::NotExist);
296                    } else if delta > 3 {
297                        self.sleep_between_anchors.store(false, Ordering::Release);
298                        tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER,
299                            "sleep_between_anchors set to False because anchor_id {} ahead last {} on {} > 3",
300                            anchor_id, last_anchor_id, delta,
301                        );
302                    }
303                }
304
305                if last_attempt_at.is_none() {
306                    tracing::debug!(
307                        target: tracing_targets::MEMPOOL_ADAPTER,
308                        anchor_id,
309                        "There is no required anchor in cache. \
310                        STUB: Requested it from mempool. Waiting...",
311                    );
312                }
313
314                last_attempt_at = Some(Instant::now());
315                tokio::time::sleep(tokio::time::Duration::from_millis(1320)).await;
316                continue;
317            };
318
319            if !self.sleep_between_anchors.fetch_or(true, Ordering::AcqRel) {
320                tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER,
321                    "sleep_between_anchors set to True when requested was returned by anchor_id {}",
322                    anchor_id,
323                );
324            }
325
326            match last_attempt_at {
327                Some(last) => {
328                    tracing::debug!(
329                        target: tracing_targets::MEMPOOL_ADAPTER,
330                        anchor_id = anchor.id,
331                        elapsed = %format_duration(last.elapsed()),
332                        "STUB: Returned the anchor from mempool",
333                    );
334                }
335                None => {
336                    tracing::debug!(
337                        target: tracing_targets::MEMPOOL_ADAPTER,
338                        anchor_id = anchor.id,
339                        "Requested the anchor from the local cache",
340                    );
341                }
342            }
343
344            return Ok(GetAnchorResult::Exist(anchor));
345        }
346    }
347
348    async fn get_next_anchor(&self, prev_anchor_id: MempoolAnchorId) -> Result<GetAnchorResult> {
349        let range = (
350            std::ops::Bound::Excluded(prev_anchor_id),
351            std::ops::Bound::Unbounded,
352        );
353
354        let mut last_attempt_at = None;
355        loop {
356            let res = self
357                .anchors_cache
358                .read()
359                .range(range)
360                .next()
361                .map(|(_, v)| v.clone());
362
363            let Some(anchor) = res else {
364                let last_anchor_id = self
365                    .anchors_cache
366                    .read()
367                    .last_key_value()
368                    .map_or(0, |(_, last_anchor)| last_anchor.id);
369                let delta = prev_anchor_id.saturating_sub(last_anchor_id);
370                if delta >= 20 {
371                    tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER,
372                        "sleep_between_anchors set to False because prev_anchor_id {} ahead last {} on {} >= 20",
373                        prev_anchor_id, last_anchor_id, delta,
374                    );
375                    tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER,
376                        "STUB: mempool return None because prev_anchor_id {} ahead last {} on {} >= 20",
377                        prev_anchor_id, last_anchor_id, delta,
378                    );
379                    return Ok(GetAnchorResult::NotExist);
380                } else if delta >= 3 {
381                    self.sleep_between_anchors.store(false, Ordering::Release);
382                    tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER,
383                        "sleep_between_anchors set to False because prev_anchor_id {} ahead last {} on {} >= 3",
384                        prev_anchor_id, last_anchor_id, delta,
385                    );
386                }
387
388                if last_attempt_at.is_none() {
389                    tracing::debug!(
390                        target: tracing_targets::MEMPOOL_ADAPTER,
391                        prev_anchor_id,
392                        "There is no next anchor in cache. \
393                        STUB: Requested it from mempool. Waiting...",
394                    );
395                }
396
397                last_attempt_at = Some(Instant::now());
398                tokio::time::sleep(tokio::time::Duration::from_millis(1320)).await;
399                continue;
400            };
401
402            if !self.sleep_between_anchors.fetch_or(true, Ordering::AcqRel) {
403                tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER,
404                    "sleep_between_anchors set to True when next was returned after prev_anchor_id {}",
405                    prev_anchor_id,
406                );
407            }
408
409            match last_attempt_at {
410                Some(last) => {
411                    tracing::debug!(
412                        target: tracing_targets::MEMPOOL_ADAPTER,
413                        prev_anchor_id,
414                        anchor_id = anchor.id,
415                        elapsed = %format_duration(last.elapsed()),
416                        "STUB: Returned the next anchor from mempool",
417                    );
418                }
419                None => {
420                    tracing::debug!(
421                        target: tracing_targets::MEMPOOL_ADAPTER,
422                        prev_anchor_id,
423                        anchor_id = anchor.id,
424                        "Requested the next anchor from the local cache",
425                    );
426                }
427            }
428
429            return Ok(GetAnchorResult::Exist(anchor));
430        }
431    }
432
433    fn clear_anchors_cache(&self, before_anchor_id: MempoolAnchorId) -> Result<()> {
434        let mut anchors_cache = self.anchors_cache.write();
435        anchors_cache.retain(|anchor_id, _| anchor_id >= &before_anchor_id);
436        Ok(())
437    }
438
439    fn accept_external(&self, _message: bytes::Bytes) {
440        panic!("accept_external not implemented for stub");
441    }
442
443    async fn update_delayed_config(
444        &self,
445        _consensus_config: Option<&ConsensusConfig>,
446        _genesis_info: &GenesisInfo,
447    ) -> Result<()> {
448        panic!("update_delayed_config not implemented for stub");
449    }
450}
451
452pub(crate) fn make_empty_anchor(
453    id: MempoolAnchorId,
454    prev_id: MempoolAnchorId,
455    chain_time: u64,
456) -> Arc<MempoolAnchor> {
457    Arc::new(MempoolAnchor {
458        id,
459        prev_id: Some(prev_id),
460        author: PeerId(Default::default()),
461        chain_time,
462        externals: vec![],
463    })
464}
465
466pub(crate) fn make_stub_anchor(id: MempoolAnchorId, prev_id: MempoolAnchorId) -> MempoolAnchor {
467    let chain_time = id as u64 * 1736 % 1000000000;
468
469    let externals_count = (chain_time % 10) as u32;
470
471    let mut externals = vec![];
472    for i in 0..externals_count {
473        let addr_hash_base = i % 6 + 1;
474        let dst = IntAddr::Std(StdAddr::new(
475            if i > 0 && i % 3 == 0 { -1 } else { 0 },
476            HashBytes([addr_hash_base.try_into().unwrap(); 32]),
477        ));
478        externals.push(Arc::new(make_stub_external(id, chain_time, i, dst)));
479    }
480
481    MempoolAnchor {
482        id,
483        prev_id: Some(prev_id),
484        author: PeerId(Default::default()),
485        chain_time,
486        externals,
487    }
488}
489
490pub(crate) fn make_stub_external(
491    anchor_id: MempoolAnchorId,
492    chain_time: u64,
493    msg_idx: u32,
494    dst: IntAddr,
495) -> ExternalMessage {
496    let body = {
497        let mut builder = CellBuilder::new();
498        builder.store_u32(anchor_id).unwrap();
499        builder.store_u64(chain_time).unwrap();
500        builder.store_u32(msg_idx).unwrap();
501        builder.build().unwrap()
502    };
503
504    let info = ExtInMsgInfo {
505        dst,
506        ..Default::default()
507    };
508
509    let cell = CellBuilder::build_from(Message {
510        info: MsgInfo::ExtIn(info.clone()),
511        init: None,
512        body: body.as_slice().unwrap(),
513        layout: None,
514    })
515    .unwrap();
516
517    ExternalMessage { cell, info }
518}
519
520pub(crate) fn make_anchor_from_file(
521    id: MempoolAnchorId,
522    prev_id: MempoolAnchorId,
523    path: &Path,
524) -> Result<Arc<MempoolAnchor>> {
525    let data = std::fs::read_to_string(path)?;
526
527    let file_name = path.file_name().unwrap().to_str().unwrap();
528    tracing::debug!(
529        target: tracing_targets::MEMPOOL_ADAPTER,
530        file_name,
531        "read external from file"
532    );
533
534    let chain_time = file_name.parse().unwrap();
535
536    let cell = Boc::decode_base64(data)?;
537    let message: Message<'_> = cell.parse()?;
538
539    let mut externals = vec![];
540    if let MsgInfo::ExtIn(info) = message.info {
541        externals.push(Arc::new(ExternalMessage { cell, info }));
542    }
543
544    Ok(Arc::new(MempoolAnchor {
545        id,
546        prev_id: Some(prev_id),
547        author: PeerId(Default::default()),
548        chain_time,
549        externals,
550    }))
551}
552
553fn make_round_interval() -> Duration {
554    Duration::from_millis(rand::rng().random_range(240..340))
555}
556
557#[cfg(test)]
558mod tests {
559    use super::*;
560
561    struct MempoolEventStubListener;
562    #[async_trait]
563    impl MempoolEventListener for MempoolEventStubListener {
564        async fn on_new_anchor(&self, anchor: Arc<MempoolAnchor>) -> Result<()> {
565            tracing::trace!(
566                "MempoolEventStubListener: on_new_anchor event emitted for anchor \
567                (id: {}, chain_time: {}, externals: {})",
568                anchor.id,
569                anchor.chain_time,
570                anchor.externals.len(),
571            );
572            Ok(())
573        }
574    }
575
576    #[tokio::test]
577    async fn test_stub_anchors_generator() -> Result<()> {
578        tycho_util::test::init_logger("test_stub_anchors_generator", "trace");
579
580        let adapter =
581            MempoolAdapterStubImpl::with_stub_externals(Arc::new(MempoolEventStubListener), None);
582
583        // try get existing anchor by id
584        let result = adapter.get_anchor_by_id(3).await?;
585        assert!(result.anchor().is_some());
586        assert_eq!(result.anchor().unwrap().id, 3);
587
588        // try get next anchor after (id: 3)
589        let result = adapter.get_next_anchor(3).await?;
590        assert!(result.anchor().is_some());
591        assert_eq!(result.anchor().unwrap().id, 4);
592
593        // try get next anchor after (id: 5), will wait some time
594        let result = adapter.get_next_anchor(5).await?;
595        assert!(result.anchor().is_some());
596        assert_eq!(result.anchor().unwrap().id, 6);
597
598        // test clear anchors cache
599        adapter.clear_anchors_cache(6)?;
600        let result = adapter.get_anchor_by_id(3).await?;
601        assert!(result.anchor().is_none());
602        let result = adapter.get_anchor_by_id(4).await?;
603        assert!(result.anchor().is_none());
604        let result = adapter.get_anchor_by_id(6).await?;
605        assert!(result.anchor().is_some());
606        assert_eq!(result.anchor().unwrap().id, 6);
607
608        Ok(())
609    }
610}