lumina_node/
daser.rs

1//! Component responsible for data availability sampling of the already synchronized block
2//! headers announced on the Celestia network.
3
4use std::collections::HashSet;
5use std::sync::Arc;
6use std::time::Duration;
7
8use futures::future::BoxFuture;
9use futures::stream::FuturesUnordered;
10use futures::{FutureExt, StreamExt};
11use lumina_utils::executor::{spawn, JoinHandle};
12use lumina_utils::time::{Instant, Interval};
13use rand::Rng;
14use tendermint::Time;
15use tokio::select;
16use tokio::sync::{mpsc, oneshot};
17use tokio_util::sync::CancellationToken;
18use tracing::{debug, error, info, instrument, warn};
19
20use crate::events::{EventPublisher, NodeEvent};
21use crate::p2p::shwap::sample_cid;
22use crate::p2p::{P2p, P2pError};
23use crate::store::{BlockRanges, Store, StoreError};
24use crate::utils::{OneshotSenderExt, TimeExt};
25
26const MAX_SAMPLES_NEEDED: usize = 16;
27const GET_SAMPLE_MIN_TIMEOUT: Duration = Duration::from_secs(10);
28const PRUNER_THRESHOLD: u64 = 512;
29
30pub(crate) const DEFAULT_CONCURENCY_LIMIT: usize = 3;
31pub(crate) const DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY: usize = 5;
32
33type Result<T, E = DaserError> = std::result::Result<T, E>;
34
35/// Representation of all the errors that can occur in `Daser` component.
36#[derive(Debug, thiserror::Error)]
37pub enum DaserError {
38    /// An error propagated from the `P2p` component.
39    #[error("P2p: {0}")]
40    P2p(#[from] P2pError),
41
42    /// An error propagated from the [`Store`] component.
43    #[error("Store: {0}")]
44    Store(#[from] StoreError),
45
46    /// The worker has died.
47    #[error("Worker died")]
48    WorkerDied,
49
50    /// Channel closed unexpectedly.
51    #[error("Channel closed unexpectedly")]
52    ChannelClosedUnexpectedly,
53}
54
55impl From<oneshot::error::RecvError> for DaserError {
56    fn from(_value: oneshot::error::RecvError) -> Self {
57        DaserError::ChannelClosedUnexpectedly
58    }
59}
60
61/// Component responsible for data availability sampling of blocks from the network.
62pub(crate) struct Daser {
63    cmd_tx: mpsc::Sender<DaserCmd>,
64    cancellation_token: CancellationToken,
65    join_handle: JoinHandle,
66}
67
68/// Arguments used to configure the [`Daser`].
69pub(crate) struct DaserArgs<S>
70where
71    S: Store,
72{
73    /// Handler for the peer to peer messaging.
74    pub(crate) p2p: Arc<P2p>,
75    /// Headers storage.
76    pub(crate) store: Arc<S>,
77    /// Event publisher.
78    pub(crate) event_pub: EventPublisher,
79    /// Size of the sampling window.
80    pub(crate) sampling_window: Duration,
81    /// How many blocks can be data sampled at the same time.
82    pub(crate) concurrency_limit: usize,
83    /// How many additional blocks can be data sampled if they are from HeaderSub.
84    pub(crate) additional_headersub_concurrency: usize,
85}
86
87#[derive(Debug)]
88pub(crate) enum DaserCmd {
89    UpdateHighestPrunableHeight {
90        value: u64,
91    },
92
93    UpdateNumberOfPrunableBlocks {
94        value: u64,
95    },
96
97    /// Used by Pruner to tell Daser about a block that is going to be pruned.
98    /// Daser then replies with `true` if Pruner is allowed to do it.
99    ///
100    /// This is needed to avoid following race condition:
101    ///
102    /// We have `Store` very tightly integrated with `beetswap::Multihasher`
103    /// and when Daser starts data sampling the header of that block must be
104    /// in the `Store` until the data sampling is finished. This can be fixed
105    /// only if we decouple `Store` from `beetswap::Multihasher`.
106    ///
107    /// However, even if we fix the above, a second problem arise: When Pruner
108    /// removes the header and samples of an ongoing data sampling, how are we
109    /// going to handle the incoming CIDs? We need somehow make sure that Pruner
110    /// will remove them after sampling is finished.
111    ///
112    /// After the above issues are fixed, this can be removed.
113    WantToPrune {
114        height: u64,
115        respond_to: oneshot::Sender<bool>,
116    },
117}
118
119impl Daser {
120    /// Create and start the [`Daser`].
121    pub(crate) fn start<S>(args: DaserArgs<S>) -> Result<Self>
122    where
123        S: Store + 'static,
124    {
125        let cancellation_token = CancellationToken::new();
126        let event_pub = args.event_pub.clone();
127        let (cmd_tx, cmd_rx) = mpsc::channel(16);
128        let mut worker = Worker::new(args, cancellation_token.child_token(), cmd_rx)?;
129
130        let join_handle = spawn(async move {
131            if let Err(e) = worker.run().await {
132                error!("Daser stopped because of a fatal error: {e}");
133
134                event_pub.send(NodeEvent::FatalDaserError {
135                    error: e.to_string(),
136                });
137            }
138        });
139
140        Ok(Daser {
141            cmd_tx,
142            cancellation_token,
143            join_handle,
144        })
145    }
146
147    #[cfg(test)]
148    pub(crate) fn mocked() -> (Self, crate::test_utils::MockDaserHandle) {
149        let (cmd_tx, cmd_rx) = mpsc::channel(16);
150        let cancellation_token = CancellationToken::new();
151
152        // Just a fake join_handle
153        let join_handle = spawn(async {});
154
155        let daser = Daser {
156            cmd_tx,
157            cancellation_token,
158            join_handle,
159        };
160
161        let mock_handle = crate::test_utils::MockDaserHandle { cmd_rx };
162
163        (daser, mock_handle)
164    }
165
166    /// Stop the worker.
167    pub(crate) fn stop(&self) {
168        // Singal the Worker to stop.
169        self.cancellation_token.cancel();
170    }
171
172    /// Wait until worker is completely stopped.
173    pub(crate) async fn join(&self) {
174        self.join_handle.join().await;
175    }
176
177    async fn send_command(&self, cmd: DaserCmd) -> Result<()> {
178        self.cmd_tx
179            .send(cmd)
180            .await
181            .map_err(|_| DaserError::WorkerDied)
182    }
183
184    pub(crate) async fn want_to_prune(&self, height: u64) -> Result<bool> {
185        let (tx, rx) = oneshot::channel();
186
187        self.send_command(DaserCmd::WantToPrune {
188            height,
189            respond_to: tx,
190        })
191        .await?;
192
193        Ok(rx.await?)
194    }
195
196    pub(crate) async fn update_highest_prunable_block(&self, value: u64) -> Result<()> {
197        self.send_command(DaserCmd::UpdateHighestPrunableHeight { value })
198            .await
199    }
200
201    pub(crate) async fn update_number_of_prunable_blocks(&self, value: u64) -> Result<()> {
202        self.send_command(DaserCmd::UpdateNumberOfPrunableBlocks { value })
203            .await
204    }
205}
206
207impl Drop for Daser {
208    fn drop(&mut self) {
209        self.stop();
210    }
211}
212
213struct Worker<S>
214where
215    S: Store + 'static,
216{
217    cmd_rx: mpsc::Receiver<DaserCmd>,
218    cancellation_token: CancellationToken,
219    event_pub: EventPublisher,
220    p2p: Arc<P2p>,
221    store: Arc<S>,
222    max_samples_needed: usize,
223    sampling_futs: FuturesUnordered<BoxFuture<'static, Result<(u64, bool)>>>,
224    queue: BlockRanges,
225    timed_out: BlockRanges,
226    ongoing: BlockRanges,
227    will_be_pruned: BlockRanges,
228    sampling_window: Duration,
229    concurrency_limit: usize,
230    additional_headersub_concurency: usize,
231    head_height: Option<u64>,
232    highest_prunable_height: Option<u64>,
233    num_of_prunable_blocks: u64,
234}
235
236impl<S> Worker<S>
237where
238    S: Store,
239{
240    fn new(
241        args: DaserArgs<S>,
242        cancellation_token: CancellationToken,
243        cmd_rx: mpsc::Receiver<DaserCmd>,
244    ) -> Result<Worker<S>> {
245        Ok(Worker {
246            cmd_rx,
247            cancellation_token,
248            event_pub: args.event_pub,
249            p2p: args.p2p,
250            store: args.store,
251            max_samples_needed: MAX_SAMPLES_NEEDED,
252            sampling_futs: FuturesUnordered::new(),
253            queue: BlockRanges::default(),
254            timed_out: BlockRanges::default(),
255            ongoing: BlockRanges::default(),
256            will_be_pruned: BlockRanges::default(),
257            sampling_window: args.sampling_window,
258            concurrency_limit: args.concurrency_limit,
259            additional_headersub_concurency: args.additional_headersub_concurrency,
260            head_height: None,
261            highest_prunable_height: None,
262            num_of_prunable_blocks: 0,
263        })
264    }
265
266    async fn run(&mut self) -> Result<()> {
267        loop {
268            if self.cancellation_token.is_cancelled() {
269                break;
270            }
271
272            self.connecting_event_loop().await;
273
274            if self.cancellation_token.is_cancelled() {
275                break;
276            }
277
278            self.connected_event_loop().await?;
279        }
280
281        debug!("Daser stopped");
282        Ok(())
283    }
284
285    async fn connecting_event_loop(&mut self) {
286        debug!("Entering connecting_event_loop");
287
288        let mut peer_tracker_info_watcher = self.p2p.peer_tracker_info_watcher();
289
290        // Check if connection status changed before watcher was created
291        if peer_tracker_info_watcher.borrow().num_connected_peers > 0 {
292            return;
293        }
294
295        loop {
296            select! {
297                _ = self.cancellation_token.cancelled() => {
298                    break;
299                }
300                _ = peer_tracker_info_watcher.changed() => {
301                    if peer_tracker_info_watcher.borrow().num_connected_peers > 0 {
302                        break;
303                    }
304                }
305                Some(cmd) = self.cmd_rx.recv() => self.on_cmd(cmd).await,
306            }
307        }
308    }
309
310    async fn connected_event_loop(&mut self) -> Result<()> {
311        debug!("Entering connected_event_loop");
312
313        let mut report_interval = Interval::new(Duration::from_secs(60)).await;
314        let mut peer_tracker_info_watcher = self.p2p.peer_tracker_info_watcher();
315
316        // Check if connection status changed before the watcher was created
317        if peer_tracker_info_watcher.borrow().num_connected_peers == 0 {
318            warn!("All peers disconnected");
319            return Ok(());
320        }
321
322        // Workaround because `wait_new_head` is not cancel-safe.
323        //
324        // TODO: Only Syncer add new headers to the store, so ideally
325        // Syncer should inform Daser that new headers were added.
326        let store = self.store.clone();
327        let mut wait_new_head = store.wait_new_head();
328
329        self.update_queue().await?;
330
331        let mut first_report = true;
332
333        loop {
334            // Start as many data sampling we are allowed.
335            while self.schedule_next_sample_block().await? {}
336
337            if first_report {
338                self.report().await?;
339                first_report = false;
340            }
341
342            select! {
343                _ = self.cancellation_token.cancelled() => {
344                    break;
345                }
346                _ = peer_tracker_info_watcher.changed() => {
347                    if peer_tracker_info_watcher.borrow().num_connected_peers == 0 {
348                        warn!("All peers disconnected");
349                        break;
350                    }
351                }
352                _ = report_interval.tick() => self.report().await?,
353                Some(cmd) = self.cmd_rx.recv() => self.on_cmd(cmd).await,
354                Some(res) = self.sampling_futs.next() => {
355                    // Beetswap only returns fatal errors that are not related
356                    // to P2P nor networking.
357                    let (height, timed_out) = res?;
358
359                    if timed_out {
360                        self.timed_out.insert_relaxed(height..=height).expect("invalid height");
361                    } else {
362                        self.store.mark_as_sampled(height).await?;
363                    }
364
365                    self.ongoing.remove_relaxed(height..=height).expect("invalid height");
366                },
367                _ = &mut wait_new_head => {
368                    wait_new_head = store.wait_new_head();
369                    self.update_queue().await?;
370                }
371            }
372        }
373
374        self.sampling_futs.clear();
375        self.queue = BlockRanges::default();
376        self.ongoing = BlockRanges::default();
377        self.timed_out = BlockRanges::default();
378        self.head_height = None;
379
380        Ok(())
381    }
382
383    #[instrument(skip_all)]
384    async fn report(&mut self) -> Result<()> {
385        let sampled = self.store.get_sampled_ranges().await?;
386
387        info!(
388            "data sampling: stored and sampled blocks: {}, ongoing blocks: {}",
389            sampled, &self.ongoing,
390        );
391
392        Ok(())
393    }
394
395    async fn on_cmd(&mut self, cmd: DaserCmd) {
396        match cmd {
397            DaserCmd::WantToPrune { height, respond_to } => {
398                let res = self.on_want_to_prune(height).await;
399                respond_to.maybe_send(res);
400            }
401            DaserCmd::UpdateHighestPrunableHeight { value } => {
402                self.highest_prunable_height = Some(value);
403            }
404            DaserCmd::UpdateNumberOfPrunableBlocks { value } => {
405                self.num_of_prunable_blocks = value;
406            }
407        }
408    }
409
410    async fn on_want_to_prune(&mut self, height: u64) -> bool {
411        // Pruner should not remove headers that are related to an ongoing sampling.
412        if self.ongoing.contains(height) {
413            return false;
414        }
415
416        // Header will be pruned, so we remove it from the queue to avoid race conditions.
417        self.queue
418            .remove_relaxed(height..=height)
419            .expect("invalid height");
420        // We also make sure `populate_queue` will not put it back.
421        self.will_be_pruned
422            .insert_relaxed(height..=height)
423            .expect("invalid height");
424
425        true
426    }
427
428    async fn schedule_next_sample_block(&mut self) -> Result<bool> {
429        // Schedule the most recent un-sampled block.
430        let header = loop {
431            let Some(height) = self.queue.pop_head() else {
432                return Ok(false);
433            };
434
435            let concurrency_limit = if height <= self.highest_prunable_height.unwrap_or(0)
436                && self.num_of_prunable_blocks >= PRUNER_THRESHOLD
437            {
438                // If a block is in the prunable area and Pruner has a lot of blocks
439                // to prune, then we pause sampling.
440                0
441            } else if height == self.head_height.unwrap_or(0) {
442                // For head we allow additional concurrency
443                self.concurrency_limit + self.additional_headersub_concurency
444            } else {
445                self.concurrency_limit
446            };
447
448            if self.sampling_futs.len() >= concurrency_limit {
449                // If concurrency limit is reached, then we pause sampling
450                // and we put back the height we previously popped.
451                self.queue
452                    .insert_relaxed(height..=height)
453                    .expect("invalid height");
454                return Ok(false);
455            }
456
457            match self.store.get_by_height(height).await {
458                Ok(header) => break header,
459                Err(StoreError::NotFound) => {
460                    // Height was pruned and our queue is inconsistent.
461                    // Repopulate queue and try again.
462                    self.update_queue().await?;
463                }
464                Err(e) => return Err(e.into()),
465            }
466        };
467
468        let height = header.height().value();
469        let square_width = header.dah.square_width();
470
471        // Make sure that the block is still in the sampling window.
472        if !self.in_sampling_window(header.time()) {
473            // As soon as we reach a block that is not in the sampling
474            // window, it means the rest wouldn't be either.
475            self.queue
476                .remove_relaxed(1..=height)
477                .expect("invalid height");
478            self.timed_out
479                .insert_relaxed(1..=height)
480                .expect("invalid height");
481            return Ok(false);
482        }
483
484        // Select random shares to be sampled
485        let share_indexes = random_indexes(square_width, self.max_samples_needed);
486
487        // Update the CID list before we start sampling, otherwise it's possible for us
488        // to leak CIDs causing associated blocks to never get cleaned from blockstore.
489        let cids = share_indexes
490            .iter()
491            .map(|(row, col)| sample_cid(*row, *col, height))
492            .collect::<Result<Vec<_>, _>>()?;
493        self.store.update_sampling_metadata(height, cids).await?;
494
495        let p2p = self.p2p.clone();
496        let event_pub = self.event_pub.clone();
497        let sampling_window = self.sampling_window;
498
499        // Schedule retrival of the CIDs. This will be run later on in the `select!` loop.
500        let fut = async move {
501            let now = Instant::now();
502
503            event_pub.send(NodeEvent::SamplingStarted {
504                height,
505                square_width,
506                shares: share_indexes.iter().copied().collect(),
507            });
508
509            // We set the timeout high enough until block goes out of sampling window.
510            let timeout = calc_timeout(header.time(), Time::now(), sampling_window);
511
512            // Initialize all futures
513            let mut futs = share_indexes
514                .into_iter()
515                .map(|(row, col)| {
516                    let p2p = p2p.clone();
517
518                    async move {
519                        let res = p2p.get_sample(row, col, height, Some(timeout)).await;
520                        (row, col, res)
521                    }
522                })
523                .collect::<FuturesUnordered<_>>();
524
525            let mut sampling_timed_out = false;
526
527            // Run futures to completion
528            while let Some((row, column, res)) = futs.next().await {
529                let timed_out = match res {
530                    Ok(_) => false,
531                    // Validation is done at Bitswap level, through `ShwapMultihasher`.
532                    // If the sample is not valid, it will never be delivered to us
533                    // as the data of the CID. Because of that, the only signal
534                    // that data sampling verification failed is query timing out.
535                    Err(P2pError::BitswapQueryTimeout) => true,
536                    Err(e) => return Err(e.into()),
537                };
538
539                if timed_out {
540                    sampling_timed_out = true;
541                }
542
543                event_pub.send(NodeEvent::ShareSamplingResult {
544                    height,
545                    square_width,
546                    row,
547                    column,
548                    timed_out,
549                });
550            }
551
552            event_pub.send(NodeEvent::SamplingResult {
553                height,
554                timed_out: sampling_timed_out,
555                took: now.elapsed(),
556            });
557
558            Ok((height, sampling_timed_out))
559        }
560        .boxed();
561
562        self.sampling_futs.push(fut);
563        self.ongoing
564            .insert_relaxed(height..=height)
565            .expect("invalid height");
566
567        Ok(true)
568    }
569
570    /// Add to the queue the blocks that need to be sampled.
571    async fn update_queue(&mut self) -> Result<()> {
572        let stored = self.store.get_stored_header_ranges().await?;
573        let sampled = self.store.get_sampled_ranges().await?;
574
575        self.head_height = stored.head();
576        self.queue = stored - &sampled - &self.timed_out - &self.ongoing - &self.will_be_pruned;
577
578        Ok(())
579    }
580
581    /// Returns true if `time` is within the sampling window.
582    fn in_sampling_window(&self, time: Time) -> bool {
583        let now = Time::now();
584
585        // Header is from the future! Thus, within sampling window.
586        if now < time {
587            return true;
588        }
589
590        let Ok(age) = now.duration_since(time) else {
591            return false;
592        };
593
594        age <= self.sampling_window
595    }
596}
597
598fn calc_timeout(header_time: Time, now: Time, sampling_window: Duration) -> Duration {
599    let sampling_window_end = now.saturating_sub(sampling_window);
600
601    header_time
602        .duration_since(sampling_window_end)
603        .unwrap_or(GET_SAMPLE_MIN_TIMEOUT)
604        .max(GET_SAMPLE_MIN_TIMEOUT)
605}
606
607/// Returns unique and random indexes that will be used for sampling.
608fn random_indexes(square_width: u16, max_samples_needed: usize) -> HashSet<(u16, u16)> {
609    let samples_in_block = usize::from(square_width).pow(2);
610
611    // If block size is smaller than `max_samples_needed`, we are going
612    // to sample the whole block. Randomness is not needed for this.
613    if samples_in_block <= max_samples_needed {
614        return (0..square_width)
615            .flat_map(|row| (0..square_width).map(move |col| (row, col)))
616            .collect();
617    }
618
619    let mut indexes = HashSet::with_capacity(max_samples_needed);
620    let mut rng = rand::thread_rng();
621
622    while indexes.len() < max_samples_needed {
623        let row = rng.gen::<u16>() % square_width;
624        let col = rng.gen::<u16>() % square_width;
625        indexes.insert((row, col));
626    }
627
628    indexes
629}
630
631#[cfg(test)]
632mod tests {
633    use super::*;
634    use crate::events::{EventChannel, EventSubscriber};
635    use crate::node::SAMPLING_WINDOW;
636    use crate::p2p::shwap::convert_cid;
637    use crate::p2p::P2pCmd;
638    use crate::store::InMemoryStore;
639    use crate::test_utils::{ExtendedHeaderGeneratorExt, MockP2pHandle};
640    use crate::utils::OneshotResultSender;
641    use bytes::BytesMut;
642    use celestia_proto::bitswap::Block;
643    use celestia_types::consts::appconsts::AppVersion;
644    use celestia_types::sample::{Sample, SampleId};
645    use celestia_types::test_utils::{generate_dummy_eds, ExtendedHeaderGenerator};
646    use celestia_types::{AxisType, DataAvailabilityHeader, ExtendedDataSquare};
647    use cid::Cid;
648    use lumina_utils::test_utils::async_test;
649    use lumina_utils::time::sleep;
650    use prost::Message;
651    use std::collections::HashMap;
652    use std::time::Duration;
653
654    // Request number for which tests will simulate sampling timeout
655    //
656    // NOTE: The smallest block has 4 shares, so a 2nd request will always happen.
657    const REQ_TIMEOUT_SHARE_NUM: usize = 2;
658
659    #[async_test]
660    async fn check_calc_timeout() {
661        let now = Time::now();
662        let sampling_window = Duration::from_secs(60);
663
664        let header_time = now;
665        assert_eq!(
666            calc_timeout(header_time, now, sampling_window),
667            Duration::from_secs(60)
668        );
669
670        let header_time = now.checked_sub(Duration::from_secs(1)).unwrap();
671        assert_eq!(
672            calc_timeout(header_time, now, sampling_window),
673            Duration::from_secs(59)
674        );
675
676        let header_time = now.checked_sub(Duration::from_secs(49)).unwrap();
677        assert_eq!(
678            calc_timeout(header_time, now, sampling_window),
679            Duration::from_secs(11)
680        );
681
682        let header_time = now.checked_sub(Duration::from_secs(50)).unwrap();
683        assert_eq!(
684            calc_timeout(header_time, now, sampling_window),
685            Duration::from_secs(10)
686        );
687
688        // minimum timeout edge 1
689        let header_time = now.checked_sub(Duration::from_secs(51)).unwrap();
690        assert_eq!(
691            calc_timeout(header_time, now, sampling_window),
692            GET_SAMPLE_MIN_TIMEOUT
693        );
694
695        // minimum timeout edge 2
696        let header_time = now.checked_sub(Duration::from_secs(60)).unwrap();
697        assert_eq!(
698            calc_timeout(header_time, now, sampling_window),
699            GET_SAMPLE_MIN_TIMEOUT
700        );
701
702        // header outside of the sampling window
703        let header_time = now.checked_sub(Duration::from_secs(61)).unwrap();
704        assert_eq!(
705            calc_timeout(header_time, now, sampling_window),
706            GET_SAMPLE_MIN_TIMEOUT
707        );
708
709        // header from the "future"
710        let header_time = now.checked_add(Duration::from_secs(1)).unwrap();
711        assert_eq!(
712            calc_timeout(header_time, now, sampling_window),
713            Duration::from_secs(61)
714        );
715    }
716
717    #[async_test]
718    async fn received_valid_samples() {
719        let (mock, mut handle) = P2p::mocked();
720        let store = Arc::new(InMemoryStore::new());
721        let events = EventChannel::new();
722        let mut event_sub = events.subscribe();
723
724        let _daser = Daser::start(DaserArgs {
725            event_pub: events.publisher(),
726            p2p: Arc::new(mock),
727            store: store.clone(),
728            sampling_window: SAMPLING_WINDOW,
729            concurrency_limit: 1,
730            additional_headersub_concurrency: DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY,
731        })
732        .unwrap();
733
734        let mut gen = ExtendedHeaderGenerator::new();
735
736        handle.expect_no_cmd().await;
737        handle.announce_peer_connected();
738        handle.expect_no_cmd().await;
739
740        gen_and_sample_block(&mut handle, &mut gen, &store, &mut event_sub, 2, false).await;
741        gen_and_sample_block(&mut handle, &mut gen, &store, &mut event_sub, 4, false).await;
742        gen_and_sample_block(&mut handle, &mut gen, &store, &mut event_sub, 8, false).await;
743        gen_and_sample_block(&mut handle, &mut gen, &store, &mut event_sub, 16, false).await;
744    }
745
746    #[async_test]
747    async fn sampling_timeout() {
748        let (mock, mut handle) = P2p::mocked();
749        let store = Arc::new(InMemoryStore::new());
750        let events = EventChannel::new();
751        let mut event_sub = events.subscribe();
752
753        let _daser = Daser::start(DaserArgs {
754            event_pub: events.publisher(),
755            p2p: Arc::new(mock),
756            store: store.clone(),
757            sampling_window: SAMPLING_WINDOW,
758            concurrency_limit: 1,
759            additional_headersub_concurrency: DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY,
760        })
761        .unwrap();
762
763        let mut gen = ExtendedHeaderGenerator::new();
764
765        handle.expect_no_cmd().await;
766        handle.announce_peer_connected();
767        handle.expect_no_cmd().await;
768
769        gen_and_sample_block(&mut handle, &mut gen, &store, &mut event_sub, 2, false).await;
770        gen_and_sample_block(&mut handle, &mut gen, &store, &mut event_sub, 4, true).await;
771        gen_and_sample_block(&mut handle, &mut gen, &store, &mut event_sub, 8, false).await;
772    }
773
774    #[async_test]
775    async fn backward_dasing() {
776        let (mock, mut handle) = P2p::mocked();
777        let store = Arc::new(InMemoryStore::new());
778        let events = EventChannel::new();
779
780        let _daser = Daser::start(DaserArgs {
781            event_pub: events.publisher(),
782            p2p: Arc::new(mock),
783            store: store.clone(),
784            sampling_window: SAMPLING_WINDOW,
785            concurrency_limit: 1,
786            additional_headersub_concurrency: DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY,
787        })
788        .unwrap();
789
790        let mut gen = ExtendedHeaderGenerator::new();
791
792        handle.expect_no_cmd().await;
793        handle.announce_peer_connected();
794        handle.expect_no_cmd().await;
795
796        let mut edses = Vec::new();
797        let mut headers = Vec::new();
798
799        for _ in 0..20 {
800            let eds = generate_dummy_eds(2, AppVersion::V2);
801            let dah = DataAvailabilityHeader::from_eds(&eds);
802            let header = gen.next_with_dah(dah);
803
804            edses.push(eds);
805            headers.push(header);
806        }
807
808        // Insert 5-10 block headers
809        store.insert(headers[4..=9].to_vec()).await.unwrap();
810
811        // Sample block 10
812        handle_get_shwap_cid(&mut handle, 10, &edses[9], false).await;
813
814        // Sample block 9
815        handle_get_shwap_cid(&mut handle, 9, &edses[8], false).await;
816
817        // To avoid race conditions we wait a bit for the block 8 to be scheduled
818        sleep(Duration::from_millis(10)).await;
819
820        // Insert 16-20 block headers
821        store.insert(headers[15..=19].to_vec()).await.unwrap();
822
823        // To avoid race conditions we wait a bit for the new head (block 20) to be scheduled
824        sleep(Duration::from_millis(10)).await;
825
826        // Now daser runs two concurrent data sampling: block 8 and block 20
827        handle_concurrent_get_shwap_cid(
828            &mut handle,
829            [(8, &edses[9], false), (20, &edses[19], false)],
830        )
831        .await;
832
833        // Sample and reject block 19
834        handle_get_shwap_cid(&mut handle, 19, &edses[18], true).await;
835
836        // Simulate disconnection
837        handle.announce_all_peers_disconnected();
838
839        // Daser may scheduled Block 18 already, so we need to reply to that requests.
840        // For the sake of the test we reply with a bitswap timeout.
841        while let Some(cmd) = handle.try_recv_cmd().await {
842            match cmd {
843                P2pCmd::GetShwapCid { respond_to, .. } => {
844                    let _ = respond_to.send(Err(P2pError::BitswapQueryTimeout));
845                }
846                cmd => panic!("Unexpected command: {cmd:?}"),
847            }
848        }
849
850        // We shouldn't have any other requests from daser because it's in connecting state.
851        handle.expect_no_cmd().await;
852
853        // Simulate that a peer connected
854        handle.announce_peer_connected();
855
856        // Because of disconnection and previous rejection of block 19, daser will resample it
857        handle_get_shwap_cid(&mut handle, 19, &edses[18], false).await;
858
859        // Sample block 16 until 18
860        for height in (16..=18).rev() {
861            let idx = height as usize - 1;
862            handle_get_shwap_cid(&mut handle, height, &edses[idx], false).await;
863        }
864
865        // Sample block 5 until 7
866        for height in (5..=7).rev() {
867            let idx = height as usize - 1;
868            handle_get_shwap_cid(&mut handle, height, &edses[idx], false).await;
869        }
870
871        handle.expect_no_cmd().await;
872
873        // Push block 21 in the store
874        let eds = generate_dummy_eds(2, AppVersion::V2);
875        let dah = DataAvailabilityHeader::from_eds(&eds);
876        let header = gen.next_with_dah(dah);
877        store.insert(header).await.unwrap();
878
879        // Sample block 21
880        handle_get_shwap_cid(&mut handle, 21, &eds, false).await;
881
882        handle.expect_no_cmd().await;
883    }
884
885    #[async_test]
886    async fn concurrency_limits() {
887        let (mock, mut handle) = P2p::mocked();
888        let store = Arc::new(InMemoryStore::new());
889        let events = EventChannel::new();
890
891        // Concurrency limit
892        let concurrency_limit = 10;
893        // Additional concurrency limit for heads.
894        // In other words concurrency limit becames 15.
895        let additional_headersub_concurrency = 5;
896        // Default number of shares that ExtendedHeaderGenerator
897        // generates per block.
898        let shares_per_block = 4;
899
900        let mut gen = ExtendedHeaderGenerator::new();
901        store.insert(gen.next_many_verified(30)).await.unwrap();
902
903        let _daser = Daser::start(DaserArgs {
904            event_pub: events.publisher(),
905            p2p: Arc::new(mock),
906            store: store.clone(),
907            sampling_window: SAMPLING_WINDOW,
908            concurrency_limit,
909            additional_headersub_concurrency,
910        })
911        .unwrap();
912
913        handle.expect_no_cmd().await;
914        handle.announce_peer_connected();
915
916        let mut hold_respond_channels = Vec::new();
917
918        for _ in 0..(concurrency_limit * shares_per_block) {
919            let (cid, respond_to) = handle.expect_get_shwap_cid().await;
920            hold_respond_channels.push((cid, respond_to));
921        }
922
923        // Concurrency limit reached
924        handle.expect_no_cmd().await;
925
926        // However a new head will be allowed because additional limit is applied
927        store.insert(gen.next_many_verified(2)).await.unwrap();
928
929        for _ in 0..shares_per_block {
930            let (cid, respond_to) = handle.expect_get_shwap_cid().await;
931            hold_respond_channels.push((cid, respond_to));
932        }
933        handle.expect_no_cmd().await;
934
935        // Now 11 blocks are ongoing. In order for Daser to schedule the next
936        // one, 2 blocks need to finish.
937        // We stop sampling for blocks 29 and 30 in order to simulate this.
938        stop_sampling_for(&mut hold_respond_channels, 29);
939        stop_sampling_for(&mut hold_respond_channels, 30);
940
941        // Now Daser will schedule the next block.
942        for _ in 0..shares_per_block {
943            let (cid, respond_to) = handle.expect_get_shwap_cid().await;
944            hold_respond_channels.push((cid, respond_to));
945        }
946
947        // And... concurrency limit is reached again.
948        handle.expect_no_cmd().await;
949
950        // Generate 5 more heads
951        for _ in 0..additional_headersub_concurrency {
952            store.insert(gen.next_many_verified(1)).await.unwrap();
953            // Give some time for Daser to shedule it
954            sleep(Duration::from_millis(10)).await;
955        }
956
957        for _ in 0..(additional_headersub_concurrency * shares_per_block) {
958            let (cid, respond_to) = handle.expect_get_shwap_cid().await;
959            hold_respond_channels.push((cid, respond_to));
960        }
961
962        // Concurrency limit for heads is reached
963        handle.expect_no_cmd().await;
964        store.insert(gen.next_many_verified(1)).await.unwrap();
965        handle.expect_no_cmd().await;
966
967        // Now we stop 1 block and Daser will schedule the head
968        // we generated above.
969        stop_sampling_for(&mut hold_respond_channels, 28);
970
971        for _ in 0..shares_per_block {
972            let (cid, respond_to) = handle.expect_get_shwap_cid().await;
973            hold_respond_channels.push((cid, respond_to));
974        }
975
976        // Concurrency limit for heads is reached again
977        handle.expect_no_cmd().await;
978        store.insert(gen.next_many_verified(1)).await.unwrap();
979        handle.expect_no_cmd().await;
980    }
981
982    #[async_test]
983    async fn ratelimit() {
984        let (mock, mut handle) = P2p::mocked();
985        let store = Arc::new(InMemoryStore::new());
986        let events = EventChannel::new();
987        let mut event_sub = events.subscribe();
988
989        let daser = Daser::start(DaserArgs {
990            event_pub: events.publisher(),
991            p2p: Arc::new(mock),
992            store: store.clone(),
993            sampling_window: Duration::from_secs(60),
994            concurrency_limit: 1,
995            additional_headersub_concurrency: DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY,
996        })
997        .unwrap();
998
999        let mut gen = ExtendedHeaderGenerator::new();
1000
1001        let now = Time::now();
1002        let first_header_time = (now - Duration::from_secs(1024)).unwrap();
1003        gen.set_time(first_header_time, Duration::from_secs(1));
1004        store.insert(gen.next_many(990)).await.unwrap();
1005
1006        let mut edses = HashMap::new();
1007
1008        for height in 991..=1000 {
1009            let eds = generate_dummy_eds(2, AppVersion::V2);
1010            let dah = DataAvailabilityHeader::from_eds(&eds);
1011            let header = gen.next_with_dah(dah);
1012
1013            edses.insert(height, eds);
1014            store.insert(header).await.unwrap();
1015        }
1016
1017        // Assume that Pruner reports the following
1018        daser.update_highest_prunable_block(1000).await.unwrap();
1019        daser.update_number_of_prunable_blocks(1000).await.unwrap();
1020
1021        handle.expect_no_cmd().await;
1022        handle.announce_peer_connected();
1023
1024        // Blocks that are currently stored are ratelimited, so we shouldn't get any command.
1025        handle.expect_no_cmd().await;
1026
1027        // Any blocks above 1000 are not limited because they are not in prunable area
1028        gen_and_sample_block(&mut handle, &mut gen, &store, &mut event_sub, 2, false).await;
1029        gen_and_sample_block(&mut handle, &mut gen, &store, &mut event_sub, 2, false).await;
1030
1031        // Again back to ratelimited
1032        handle.expect_no_cmd().await;
1033
1034        // Now Pruner reports that number of prunable blocks is lower that the threshold
1035        daser
1036            .update_number_of_prunable_blocks(PRUNER_THRESHOLD - 1)
1037            .await
1038            .unwrap();
1039        sleep(Duration::from_millis(5)).await;
1040
1041        // Sample blocks that are in prunable area
1042        sample_block(
1043            &mut handle,
1044            &store,
1045            &mut event_sub,
1046            edses.get(&1000).unwrap(),
1047            1000,
1048            false,
1049        )
1050        .await;
1051        sample_block(
1052            &mut handle,
1053            &store,
1054            &mut event_sub,
1055            edses.get(&999).unwrap(),
1056            999,
1057            false,
1058        )
1059        .await;
1060
1061        // Now pruner reports prunable blocks reached the threshold again
1062        daser
1063            .update_number_of_prunable_blocks(PRUNER_THRESHOLD)
1064            .await
1065            .unwrap();
1066        sleep(Duration::from_millis(5)).await;
1067
1068        // But sampling of 998 block was already scheduled, so we need to handle it
1069        sample_block(
1070            &mut handle,
1071            &store,
1072            &mut event_sub,
1073            edses.get(&998).unwrap(),
1074            998,
1075            false,
1076        )
1077        .await;
1078
1079        // Daser is ratelimited again
1080        handle.expect_no_cmd().await;
1081    }
1082
1083    fn stop_sampling_for(
1084        responders: &mut Vec<(Cid, OneshotResultSender<Vec<u8>, P2pError>)>,
1085        height: u64,
1086    ) {
1087        let mut indexes = Vec::new();
1088
1089        for (idx, (cid, _)) in responders.iter().enumerate() {
1090            let sample_id: SampleId = cid.try_into().unwrap();
1091            if sample_id.block_height() == height {
1092                indexes.push(idx)
1093            }
1094        }
1095
1096        for idx in indexes.into_iter().rev() {
1097            let (_cid, respond_to) = responders.remove(idx);
1098            respond_to.send(Err(P2pError::BitswapQueryTimeout)).unwrap();
1099        }
1100    }
1101
1102    async fn gen_and_sample_block(
1103        handle: &mut MockP2pHandle,
1104        gen: &mut ExtendedHeaderGenerator,
1105        store: &InMemoryStore,
1106        event_sub: &mut EventSubscriber,
1107        square_width: usize,
1108        simulate_sampling_timeout: bool,
1109    ) {
1110        let eds = generate_dummy_eds(square_width, AppVersion::V2);
1111        let dah = DataAvailabilityHeader::from_eds(&eds);
1112        let header = gen.next_with_dah(dah);
1113        let height = header.height().value();
1114
1115        store.insert(header).await.unwrap();
1116
1117        sample_block(
1118            handle,
1119            store,
1120            event_sub,
1121            &eds,
1122            height,
1123            simulate_sampling_timeout,
1124        )
1125        .await;
1126
1127        // `gen_and_sample_block` is only used in cases where Daser doesn't have
1128        // anything else to schedule.
1129        handle.expect_no_cmd().await;
1130        assert!(event_sub.try_recv().is_err());
1131    }
1132
1133    async fn sample_block(
1134        handle: &mut MockP2pHandle,
1135        store: &InMemoryStore,
1136        event_sub: &mut EventSubscriber,
1137        eds: &ExtendedDataSquare,
1138        height: u64,
1139        simulate_sampling_timeout: bool,
1140    ) {
1141        let cids = handle_get_shwap_cid(handle, height, eds, simulate_sampling_timeout).await;
1142
1143        // Wait to be sampled
1144        sleep(Duration::from_millis(100)).await;
1145
1146        // Check if block was sampled or timed-out.
1147        let sampled_ranges = store.get_sampled_ranges().await.unwrap();
1148        assert_eq!(sampled_ranges.contains(height), !simulate_sampling_timeout);
1149
1150        // Check if CIDs we requested successfully made it in the store
1151        let mut sampling_metadata = store.get_sampling_metadata(height).await.unwrap().unwrap();
1152        sampling_metadata.cids.sort();
1153        assert_eq!(&sampling_metadata.cids, &cids);
1154
1155        // Check if we received `SamplingStarted` event
1156        let mut remaining_shares = match event_sub.try_recv().unwrap().event {
1157            NodeEvent::SamplingStarted {
1158                height: ev_height,
1159                square_width,
1160                shares,
1161            } => {
1162                assert_eq!(ev_height, height);
1163                assert_eq!(square_width, eds.square_width());
1164
1165                // Make sure the share list matches the CIDs we received
1166                let mut cids = shares
1167                    .iter()
1168                    .map(|(row, col)| sample_cid(*row, *col, height).unwrap())
1169                    .collect::<Vec<_>>();
1170                cids.sort();
1171                assert_eq!(&sampling_metadata.cids, &cids);
1172
1173                shares.into_iter().collect::<HashSet<_>>()
1174            }
1175            ev => panic!("Unexpected event: {ev}"),
1176        };
1177
1178        // Check if we received `ShareSamplingResult` for each share
1179        for i in 1..=remaining_shares.len() {
1180            match event_sub.try_recv().unwrap().event {
1181                NodeEvent::ShareSamplingResult {
1182                    height: ev_height,
1183                    square_width,
1184                    row,
1185                    column,
1186                    timed_out,
1187                } => {
1188                    assert_eq!(ev_height, height);
1189                    assert_eq!(square_width, eds.square_width());
1190                    assert_eq!(
1191                        timed_out,
1192                        simulate_sampling_timeout && i == REQ_TIMEOUT_SHARE_NUM
1193                    );
1194                    // Make sure it is in the list and remove it
1195                    assert!(remaining_shares.remove(&(row, column)));
1196                }
1197                ev => panic!("Unexpected event: {ev}"),
1198            }
1199        }
1200
1201        assert!(remaining_shares.is_empty());
1202
1203        // Check if we received `SamplingResult` for the block
1204        match event_sub.try_recv().unwrap().event {
1205            NodeEvent::SamplingResult {
1206                height: ev_height,
1207                timed_out,
1208                took,
1209            } => {
1210                assert_eq!(ev_height, height);
1211                assert_eq!(timed_out, simulate_sampling_timeout);
1212                assert_ne!(took, Duration::default());
1213            }
1214            ev => panic!("Unexpected event: {ev}"),
1215        }
1216    }
1217
1218    /// Responds to get_shwap_cid and returns all CIDs that were requested
1219    async fn handle_concurrent_get_shwap_cid<const N: usize>(
1220        handle: &mut MockP2pHandle,
1221        handling_args: [(u64, &ExtendedDataSquare, bool); N],
1222    ) -> Vec<Cid> {
1223        struct Info<'a> {
1224            eds: &'a ExtendedDataSquare,
1225            simulate_sampling_timeout: bool,
1226            needed_samples: usize,
1227            requests_count: usize,
1228        }
1229
1230        let mut infos = handling_args
1231            .into_iter()
1232            .map(|(height, eds, simulate_sampling_timeout)| {
1233                let square_width = eds.square_width() as usize;
1234                let needed_samples = (square_width * square_width).min(MAX_SAMPLES_NEEDED);
1235
1236                (
1237                    height,
1238                    Info {
1239                        eds,
1240                        simulate_sampling_timeout,
1241                        needed_samples,
1242                        requests_count: 0,
1243                    },
1244                )
1245            })
1246            .collect::<HashMap<_, _>>();
1247
1248        let needed_samples_sum = infos.values().map(|info| info.needed_samples).sum();
1249        let mut cids = Vec::with_capacity(needed_samples_sum);
1250
1251        for _ in 0..needed_samples_sum {
1252            let (cid, respond_to) = handle.expect_get_shwap_cid().await;
1253            cids.push(cid);
1254
1255            let sample_id: SampleId = cid.try_into().unwrap();
1256            let info = infos
1257                .get_mut(&sample_id.block_height())
1258                .unwrap_or_else(|| panic!("Unexpected height: {}", sample_id.block_height()));
1259
1260            info.requests_count += 1;
1261
1262            // Simulate sampling timeout
1263            if info.simulate_sampling_timeout && info.requests_count == REQ_TIMEOUT_SHARE_NUM {
1264                respond_to.send(Err(P2pError::BitswapQueryTimeout)).unwrap();
1265                continue;
1266            }
1267
1268            let sample = gen_sample_of_cid(sample_id, info.eds).await;
1269            respond_to.send(Ok(sample)).unwrap();
1270        }
1271
1272        cids.sort();
1273        cids
1274    }
1275
1276    /// Responds to get_shwap_cid and returns all CIDs that were requested
1277    async fn handle_get_shwap_cid(
1278        handle: &mut MockP2pHandle,
1279        height: u64,
1280        eds: &ExtendedDataSquare,
1281        simulate_sampling_timeout: bool,
1282    ) -> Vec<Cid> {
1283        handle_concurrent_get_shwap_cid(handle, [(height, eds, simulate_sampling_timeout)]).await
1284    }
1285
1286    async fn gen_sample_of_cid(sample_id: SampleId, eds: &ExtendedDataSquare) -> Vec<u8> {
1287        let sample = Sample::new(
1288            sample_id.row_index(),
1289            sample_id.column_index(),
1290            AxisType::Row,
1291            eds,
1292        )
1293        .unwrap();
1294
1295        let mut container = BytesMut::new();
1296        sample.encode(&mut container);
1297
1298        let block = Block {
1299            cid: convert_cid(&sample_id.into()).unwrap().to_bytes(),
1300            container: container.to_vec(),
1301        };
1302
1303        block.encode_to_vec()
1304    }
1305}