lumina_node/
daser.rs

1//! Component responsible for data availability sampling of the already synchronized block
2//! headers announced on the Celestia network.
3
4use std::collections::HashSet;
5use std::sync::Arc;
6use std::time::Duration;
7
8use futures::future::BoxFuture;
9use futures::stream::FuturesUnordered;
10use futures::{FutureExt, StreamExt};
11use lumina_utils::executor::{JoinHandle, spawn};
12use lumina_utils::time::{Instant, Interval};
13use rand::Rng;
14use tendermint::Time;
15use tokio::select;
16use tokio::sync::{mpsc, oneshot};
17use tokio_util::sync::CancellationToken;
18use tracing::{debug, error, info, instrument, warn};
19
20use crate::events::{EventPublisher, NodeEvent};
21use crate::p2p::shwap::sample_cid;
22use crate::p2p::{P2p, P2pError};
23use crate::store::{BlockRanges, Store, StoreError};
24use crate::utils::{OneshotSenderExt, TimeExt};
25
26const MAX_SAMPLES_NEEDED: usize = 16;
27const GET_SAMPLE_MIN_TIMEOUT: Duration = Duration::from_secs(10);
28const PRUNER_THRESHOLD: u64 = 512;
29
30pub(crate) const DEFAULT_CONCURENCY_LIMIT: usize = 3;
31pub(crate) const DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY: usize = 5;
32
33type Result<T, E = DaserError> = std::result::Result<T, E>;
34
35/// Representation of all the errors that can occur in `Daser` component.
36#[derive(Debug, thiserror::Error)]
37pub enum DaserError {
38    /// An error propagated from the `P2p` component.
39    #[error("P2p: {0}")]
40    P2p(#[from] P2pError),
41
42    /// An error propagated from the [`Store`] component.
43    #[error("Store: {0}")]
44    Store(#[from] StoreError),
45
46    /// The worker has died.
47    #[error("Worker died")]
48    WorkerDied,
49
50    /// Channel closed unexpectedly.
51    #[error("Channel closed unexpectedly")]
52    ChannelClosedUnexpectedly,
53}
54
55impl From<oneshot::error::RecvError> for DaserError {
56    fn from(_value: oneshot::error::RecvError) -> Self {
57        DaserError::ChannelClosedUnexpectedly
58    }
59}
60
61/// Component responsible for data availability sampling of blocks from the network.
62pub(crate) struct Daser {
63    cmd_tx: mpsc::Sender<DaserCmd>,
64    cancellation_token: CancellationToken,
65    join_handle: JoinHandle,
66}
67
68/// Arguments used to configure the [`Daser`].
69pub(crate) struct DaserArgs<S>
70where
71    S: Store,
72{
73    /// Handler for the peer to peer messaging.
74    pub(crate) p2p: Arc<P2p>,
75    /// Headers storage.
76    pub(crate) store: Arc<S>,
77    /// Event publisher.
78    pub(crate) event_pub: EventPublisher,
79    /// Size of the sampling window.
80    pub(crate) sampling_window: Duration,
81    /// How many blocks can be data sampled at the same time.
82    pub(crate) concurrency_limit: usize,
83    /// How many additional blocks can be data sampled if they are from HeaderSub.
84    pub(crate) additional_headersub_concurrency: usize,
85}
86
87#[derive(Debug)]
88pub(crate) enum DaserCmd {
89    UpdateHighestPrunableHeight {
90        value: u64,
91    },
92
93    UpdateNumberOfPrunableBlocks {
94        value: u64,
95    },
96
97    /// Used by Pruner to tell Daser about a block that is going to be pruned.
98    /// Daser then replies with `true` if Pruner is allowed to do it.
99    ///
100    /// This is needed to avoid following race condition:
101    ///
102    /// We have `Store` very tightly integrated with `beetswap::Multihasher`
103    /// and when Daser starts data sampling the header of that block must be
104    /// in the `Store` until the data sampling is finished. This can be fixed
105    /// only if we decouple `Store` from `beetswap::Multihasher`.
106    ///
107    /// However, even if we fix the above, a second problem arise: When Pruner
108    /// removes the header and samples of an ongoing data sampling, how are we
109    /// going to handle the incoming CIDs? We need somehow make sure that Pruner
110    /// will remove them after sampling is finished.
111    ///
112    /// After the above issues are fixed, this can be removed.
113    WantToPrune {
114        height: u64,
115        respond_to: oneshot::Sender<bool>,
116    },
117}
118
119impl Daser {
120    /// Create and start the [`Daser`].
121    pub(crate) fn start<S>(args: DaserArgs<S>) -> Result<Self>
122    where
123        S: Store + 'static,
124    {
125        let cancellation_token = CancellationToken::new();
126        let event_pub = args.event_pub.clone();
127        let (cmd_tx, cmd_rx) = mpsc::channel(16);
128        let mut worker = Worker::new(args, cancellation_token.child_token(), cmd_rx)?;
129
130        let join_handle = spawn(async move {
131            if let Err(e) = worker.run().await {
132                error!("Daser stopped because of a fatal error: {e}");
133
134                event_pub.send(NodeEvent::FatalDaserError {
135                    error: e.to_string(),
136                });
137            }
138        });
139
140        Ok(Daser {
141            cmd_tx,
142            cancellation_token,
143            join_handle,
144        })
145    }
146
147    #[cfg(test)]
148    pub(crate) fn mocked() -> (Self, crate::test_utils::MockDaserHandle) {
149        let (cmd_tx, cmd_rx) = mpsc::channel(16);
150        let cancellation_token = CancellationToken::new();
151
152        // Just a fake join_handle
153        let join_handle = spawn(async {});
154
155        let daser = Daser {
156            cmd_tx,
157            cancellation_token,
158            join_handle,
159        };
160
161        let mock_handle = crate::test_utils::MockDaserHandle { cmd_rx };
162
163        (daser, mock_handle)
164    }
165
166    /// Stop the worker.
167    pub(crate) fn stop(&self) {
168        // Singal the Worker to stop.
169        self.cancellation_token.cancel();
170    }
171
172    /// Wait until worker is completely stopped.
173    pub(crate) async fn join(&self) {
174        self.join_handle.join().await;
175    }
176
177    async fn send_command(&self, cmd: DaserCmd) -> Result<()> {
178        self.cmd_tx
179            .send(cmd)
180            .await
181            .map_err(|_| DaserError::WorkerDied)
182    }
183
184    pub(crate) async fn want_to_prune(&self, height: u64) -> Result<bool> {
185        let (tx, rx) = oneshot::channel();
186
187        self.send_command(DaserCmd::WantToPrune {
188            height,
189            respond_to: tx,
190        })
191        .await?;
192
193        Ok(rx.await?)
194    }
195
196    pub(crate) async fn update_highest_prunable_block(&self, value: u64) -> Result<()> {
197        self.send_command(DaserCmd::UpdateHighestPrunableHeight { value })
198            .await
199    }
200
201    pub(crate) async fn update_number_of_prunable_blocks(&self, value: u64) -> Result<()> {
202        self.send_command(DaserCmd::UpdateNumberOfPrunableBlocks { value })
203            .await
204    }
205}
206
207impl Drop for Daser {
208    fn drop(&mut self) {
209        self.stop();
210    }
211}
212
213struct Worker<S>
214where
215    S: Store + 'static,
216{
217    cmd_rx: mpsc::Receiver<DaserCmd>,
218    cancellation_token: CancellationToken,
219    event_pub: EventPublisher,
220    p2p: Arc<P2p>,
221    store: Arc<S>,
222    max_samples_needed: usize,
223    sampling_futs: FuturesUnordered<BoxFuture<'static, Result<(u64, bool)>>>,
224    queue: BlockRanges,
225    timed_out: BlockRanges,
226    ongoing: BlockRanges,
227    will_be_pruned: BlockRanges,
228    sampling_window: Duration,
229    concurrency_limit: usize,
230    additional_headersub_concurency: usize,
231    head_height: Option<u64>,
232    highest_prunable_height: Option<u64>,
233    num_of_prunable_blocks: u64,
234}
235
236impl<S> Worker<S>
237where
238    S: Store,
239{
240    fn new(
241        args: DaserArgs<S>,
242        cancellation_token: CancellationToken,
243        cmd_rx: mpsc::Receiver<DaserCmd>,
244    ) -> Result<Worker<S>> {
245        Ok(Worker {
246            cmd_rx,
247            cancellation_token,
248            event_pub: args.event_pub,
249            p2p: args.p2p,
250            store: args.store,
251            max_samples_needed: MAX_SAMPLES_NEEDED,
252            sampling_futs: FuturesUnordered::new(),
253            queue: BlockRanges::default(),
254            timed_out: BlockRanges::default(),
255            ongoing: BlockRanges::default(),
256            will_be_pruned: BlockRanges::default(),
257            sampling_window: args.sampling_window,
258            concurrency_limit: args.concurrency_limit,
259            additional_headersub_concurency: args.additional_headersub_concurrency,
260            head_height: None,
261            highest_prunable_height: None,
262            num_of_prunable_blocks: 0,
263        })
264    }
265
266    async fn run(&mut self) -> Result<()> {
267        loop {
268            if self.cancellation_token.is_cancelled() {
269                break;
270            }
271
272            self.connecting_event_loop().await;
273
274            if self.cancellation_token.is_cancelled() {
275                break;
276            }
277
278            self.connected_event_loop().await?;
279        }
280
281        debug!("Daser stopped");
282        Ok(())
283    }
284
285    async fn connecting_event_loop(&mut self) {
286        debug!("Entering connecting_event_loop");
287
288        let mut peer_tracker_info_watcher = self.p2p.peer_tracker_info_watcher();
289
290        // Check if connection status changed before watcher was created
291        if peer_tracker_info_watcher.borrow().num_connected_peers > 0 {
292            return;
293        }
294
295        loop {
296            select! {
297                _ = self.cancellation_token.cancelled() => {
298                    break;
299                }
300                _ = peer_tracker_info_watcher.changed() => {
301                    if peer_tracker_info_watcher.borrow().num_connected_peers > 0 {
302                        break;
303                    }
304                }
305                Some(cmd) = self.cmd_rx.recv() => self.on_cmd(cmd).await,
306            }
307        }
308    }
309
310    async fn connected_event_loop(&mut self) -> Result<()> {
311        debug!("Entering connected_event_loop");
312
313        let mut report_interval = Interval::new(Duration::from_secs(60));
314        let mut peer_tracker_info_watcher = self.p2p.peer_tracker_info_watcher();
315
316        // Check if connection status changed before the watcher was created
317        if peer_tracker_info_watcher.borrow().num_connected_peers == 0 {
318            warn!("All peers disconnected");
319            return Ok(());
320        }
321
322        // Workaround because `wait_new_head` is not cancel-safe.
323        //
324        // TODO: Only Syncer add new headers to the store, so ideally
325        // Syncer should inform Daser that new headers were added.
326        let store = self.store.clone();
327        let mut wait_new_head = store.wait_new_head();
328
329        self.update_queue().await?;
330
331        let mut first_report = true;
332
333        loop {
334            // Start as many data sampling we are allowed.
335            while self.schedule_next_sample_block().await? {}
336
337            if first_report {
338                self.report().await?;
339                first_report = false;
340            }
341
342            select! {
343                _ = self.cancellation_token.cancelled() => {
344                    break;
345                }
346                _ = peer_tracker_info_watcher.changed() => {
347                    if peer_tracker_info_watcher.borrow().num_connected_peers == 0 {
348                        warn!("All peers disconnected");
349                        break;
350                    }
351                }
352                _ = report_interval.tick() => self.report().await?,
353                Some(cmd) = self.cmd_rx.recv() => self.on_cmd(cmd).await,
354                Some(res) = self.sampling_futs.next() => {
355                    // Beetswap only returns fatal errors that are not related
356                    // to P2P nor networking.
357                    let (height, timed_out) = res?;
358
359                    if timed_out {
360                        self.timed_out.insert_relaxed(height..=height).expect("invalid height");
361                    } else {
362                        self.store.mark_as_sampled(height).await?;
363                    }
364
365                    self.ongoing.remove_relaxed(height..=height).expect("invalid height");
366                },
367                _ = &mut wait_new_head => {
368                    wait_new_head = store.wait_new_head();
369                    self.update_queue().await?;
370                }
371            }
372        }
373
374        self.sampling_futs.clear();
375        self.queue = BlockRanges::default();
376        self.ongoing = BlockRanges::default();
377        self.timed_out = BlockRanges::default();
378        self.head_height = None;
379
380        Ok(())
381    }
382
383    #[instrument(skip_all)]
384    async fn report(&mut self) -> Result<()> {
385        let sampled = self.store.get_sampled_ranges().await?;
386
387        info!(
388            "data sampling: stored and sampled blocks: {}, ongoing blocks: {}",
389            sampled, &self.ongoing,
390        );
391
392        Ok(())
393    }
394
395    async fn on_cmd(&mut self, cmd: DaserCmd) {
396        match cmd {
397            DaserCmd::WantToPrune { height, respond_to } => {
398                let res = self.on_want_to_prune(height).await;
399                respond_to.maybe_send(res);
400            }
401            DaserCmd::UpdateHighestPrunableHeight { value } => {
402                self.highest_prunable_height = Some(value);
403            }
404            DaserCmd::UpdateNumberOfPrunableBlocks { value } => {
405                self.num_of_prunable_blocks = value;
406            }
407        }
408    }
409
410    async fn on_want_to_prune(&mut self, height: u64) -> bool {
411        // Pruner should not remove headers that are related to an ongoing sampling.
412        if self.ongoing.contains(height) {
413            return false;
414        }
415
416        // Header will be pruned, so we remove it from the queue to avoid race conditions.
417        self.queue
418            .remove_relaxed(height..=height)
419            .expect("invalid height");
420        // We also make sure `populate_queue` will not put it back.
421        self.will_be_pruned
422            .insert_relaxed(height..=height)
423            .expect("invalid height");
424
425        true
426    }
427
428    async fn schedule_next_sample_block(&mut self) -> Result<bool> {
429        // Schedule the most recent un-sampled block.
430        let header = loop {
431            let Some(height) = self.queue.pop_head() else {
432                return Ok(false);
433            };
434
435            let concurrency_limit = if height <= self.highest_prunable_height.unwrap_or(0)
436                && self.num_of_prunable_blocks >= PRUNER_THRESHOLD
437            {
438                // If a block is in the prunable area and Pruner has a lot of blocks
439                // to prune, then we pause sampling.
440                0
441            } else if height == self.head_height.unwrap_or(0) {
442                // For head we allow additional concurrency
443                self.concurrency_limit + self.additional_headersub_concurency
444            } else {
445                self.concurrency_limit
446            };
447
448            if self.sampling_futs.len() >= concurrency_limit {
449                // If concurrency limit is reached, then we pause sampling
450                // and we put back the height we previously popped.
451                self.queue
452                    .insert_relaxed(height..=height)
453                    .expect("invalid height");
454                return Ok(false);
455            }
456
457            match self.store.get_by_height(height).await {
458                Ok(header) => break header,
459                Err(StoreError::NotFound) => {
460                    // Height was pruned and our queue is inconsistent.
461                    // Repopulate queue and try again.
462                    self.update_queue().await?;
463                }
464                Err(e) => return Err(e.into()),
465            }
466        };
467
468        let height = header.height().value();
469        let square_width = header.dah.square_width();
470
471        // Make sure that the block is still in the sampling window.
472        if !self.in_sampling_window(header.time()) {
473            // As soon as we reach a block that is not in the sampling
474            // window, it means the rest wouldn't be either.
475            self.queue
476                .remove_relaxed(1..=height)
477                .expect("invalid height");
478            self.timed_out
479                .insert_relaxed(1..=height)
480                .expect("invalid height");
481            return Ok(false);
482        }
483
484        // Select random shares to be sampled
485        let share_indexes = random_indexes(square_width, self.max_samples_needed);
486
487        // Update the CID list before we start sampling, otherwise it's possible for us
488        // to leak CIDs causing associated blocks to never get cleaned from blockstore.
489        let cids = share_indexes
490            .iter()
491            .map(|(row, col)| sample_cid(*row, *col, height))
492            .collect::<Result<Vec<_>, _>>()?;
493        self.store.update_sampling_metadata(height, cids).await?;
494
495        let p2p = self.p2p.clone();
496        let event_pub = self.event_pub.clone();
497        let sampling_window = self.sampling_window;
498
499        // Schedule retrival of the CIDs. This will be run later on in the `select!` loop.
500        let fut = async move {
501            let now = Instant::now();
502
503            event_pub.send(NodeEvent::SamplingStarted {
504                height,
505                square_width,
506                shares: share_indexes.iter().copied().collect(),
507            });
508
509            // We set the timeout high enough until block goes out of sampling window.
510            let timeout = calc_timeout(header.time(), Time::now(), sampling_window);
511
512            // Initialize all futures
513            let mut futs = share_indexes
514                .into_iter()
515                .map(|(row, col)| {
516                    let p2p = p2p.clone();
517
518                    async move {
519                        let res = p2p.get_sample(row, col, height, Some(timeout)).await;
520                        (row, col, res)
521                    }
522                })
523                .collect::<FuturesUnordered<_>>();
524
525            let mut sampling_timed_out = false;
526
527            // Run futures to completion
528            while let Some((row, column, res)) = futs.next().await {
529                let timed_out = match res {
530                    Ok(_) => false,
531                    // Validation is done at Bitswap level, through `ShwapMultihasher`.
532                    // If the sample is not valid, it will never be delivered to us
533                    // as the data of the CID. Because of that, the only signal
534                    // that data sampling verification failed is query timing out.
535                    Err(P2pError::BitswapQueryTimeout) => true,
536                    Err(e) => return Err(e.into()),
537                };
538
539                if timed_out {
540                    sampling_timed_out = true;
541                }
542
543                event_pub.send(NodeEvent::ShareSamplingResult {
544                    height,
545                    square_width,
546                    row,
547                    column,
548                    timed_out,
549                });
550            }
551
552            event_pub.send(NodeEvent::SamplingResult {
553                height,
554                timed_out: sampling_timed_out,
555                took: now.elapsed(),
556            });
557
558            Ok((height, sampling_timed_out))
559        }
560        .boxed();
561
562        self.sampling_futs.push(fut);
563        self.ongoing
564            .insert_relaxed(height..=height)
565            .expect("invalid height");
566
567        Ok(true)
568    }
569
570    /// Add to the queue the blocks that need to be sampled.
571    async fn update_queue(&mut self) -> Result<()> {
572        let stored = self.store.get_stored_header_ranges().await?;
573        let sampled = self.store.get_sampled_ranges().await?;
574
575        self.head_height = stored.head();
576        self.queue = stored - &sampled - &self.timed_out - &self.ongoing - &self.will_be_pruned;
577
578        Ok(())
579    }
580
581    /// Returns true if `time` is within the sampling window.
582    fn in_sampling_window(&self, time: Time) -> bool {
583        let now = Time::now();
584
585        // Header is from the future! Thus, within sampling window.
586        if now < time {
587            return true;
588        }
589
590        let Ok(age) = now.duration_since(time) else {
591            return false;
592        };
593
594        age <= self.sampling_window
595    }
596}
597
598fn calc_timeout(header_time: Time, now: Time, sampling_window: Duration) -> Duration {
599    let sampling_window_end = now.saturating_sub(sampling_window);
600
601    header_time
602        .duration_since(sampling_window_end)
603        .unwrap_or(GET_SAMPLE_MIN_TIMEOUT)
604        .max(GET_SAMPLE_MIN_TIMEOUT)
605}
606
607/// Returns unique and random indexes that will be used for sampling.
608fn random_indexes(square_width: u16, max_samples_needed: usize) -> HashSet<(u16, u16)> {
609    let samples_in_block = usize::from(square_width).pow(2);
610
611    // If block size is smaller than `max_samples_needed`, we are going
612    // to sample the whole block. Randomness is not needed for this.
613    if samples_in_block <= max_samples_needed {
614        return (0..square_width)
615            .flat_map(|row| (0..square_width).map(move |col| (row, col)))
616            .collect();
617    }
618
619    let mut indexes = HashSet::with_capacity(max_samples_needed);
620    let mut rng = rand::thread_rng();
621
622    while indexes.len() < max_samples_needed {
623        let row = rng.r#gen::<u16>() % square_width;
624        let col = rng.r#gen::<u16>() % square_width;
625        indexes.insert((row, col));
626    }
627
628    indexes
629}
630
631#[cfg(test)]
632mod tests {
633    use super::*;
634    use crate::events::{EventChannel, EventSubscriber};
635    use crate::node::SAMPLING_WINDOW;
636    use crate::p2p::P2pCmd;
637    use crate::p2p::shwap::convert_cid;
638    use crate::store::InMemoryStore;
639    use crate::test_utils::{ExtendedHeaderGeneratorExt, MockP2pHandle};
640    use crate::utils::OneshotResultSender;
641    use bytes::BytesMut;
642    use celestia_proto::bitswap::Block;
643    use celestia_types::consts::appconsts::AppVersion;
644    use celestia_types::sample::{Sample, SampleId};
645    use celestia_types::test_utils::{ExtendedHeaderGenerator, generate_dummy_eds};
646    use celestia_types::{AxisType, DataAvailabilityHeader, ExtendedDataSquare};
647    use cid::Cid;
648    use lumina_utils::test_utils::async_test;
649    use lumina_utils::time::sleep;
650    use prost::Message;
651    use std::collections::HashMap;
652    use std::time::Duration;
653
654    // Request number for which tests will simulate sampling timeout
655    //
656    // NOTE: The smallest block has 4 shares, so a 2nd request will always happen.
657    const REQ_TIMEOUT_SHARE_NUM: usize = 2;
658
659    #[async_test]
660    async fn check_calc_timeout() {
661        let now = Time::now();
662        let sampling_window = Duration::from_secs(60);
663
664        let header_time = now;
665        assert_eq!(
666            calc_timeout(header_time, now, sampling_window),
667            Duration::from_secs(60)
668        );
669
670        let header_time = now.checked_sub(Duration::from_secs(1)).unwrap();
671        assert_eq!(
672            calc_timeout(header_time, now, sampling_window),
673            Duration::from_secs(59)
674        );
675
676        let header_time = now.checked_sub(Duration::from_secs(49)).unwrap();
677        assert_eq!(
678            calc_timeout(header_time, now, sampling_window),
679            Duration::from_secs(11)
680        );
681
682        let header_time = now.checked_sub(Duration::from_secs(50)).unwrap();
683        assert_eq!(
684            calc_timeout(header_time, now, sampling_window),
685            Duration::from_secs(10)
686        );
687
688        // minimum timeout edge 1
689        let header_time = now.checked_sub(Duration::from_secs(51)).unwrap();
690        assert_eq!(
691            calc_timeout(header_time, now, sampling_window),
692            GET_SAMPLE_MIN_TIMEOUT
693        );
694
695        // minimum timeout edge 2
696        let header_time = now.checked_sub(Duration::from_secs(60)).unwrap();
697        assert_eq!(
698            calc_timeout(header_time, now, sampling_window),
699            GET_SAMPLE_MIN_TIMEOUT
700        );
701
702        // header outside of the sampling window
703        let header_time = now.checked_sub(Duration::from_secs(61)).unwrap();
704        assert_eq!(
705            calc_timeout(header_time, now, sampling_window),
706            GET_SAMPLE_MIN_TIMEOUT
707        );
708
709        // header from the "future"
710        let header_time = now.checked_add(Duration::from_secs(1)).unwrap();
711        assert_eq!(
712            calc_timeout(header_time, now, sampling_window),
713            Duration::from_secs(61)
714        );
715    }
716
717    #[async_test]
718    async fn received_valid_samples() {
719        let (mock, mut handle) = P2p::mocked();
720        let store = Arc::new(InMemoryStore::new());
721        let events = EventChannel::new();
722        let mut event_sub = events.subscribe();
723
724        let _daser = Daser::start(DaserArgs {
725            event_pub: events.publisher(),
726            p2p: Arc::new(mock),
727            store: store.clone(),
728            sampling_window: SAMPLING_WINDOW,
729            concurrency_limit: 1,
730            additional_headersub_concurrency: DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY,
731        })
732        .unwrap();
733
734        let mut generator = ExtendedHeaderGenerator::new();
735
736        handle.expect_no_cmd().await;
737        handle.announce_peer_connected();
738        handle.expect_no_cmd().await;
739
740        gen_and_sample_block(
741            &mut handle,
742            &mut generator,
743            &store,
744            &mut event_sub,
745            2,
746            false,
747        )
748        .await;
749        gen_and_sample_block(
750            &mut handle,
751            &mut generator,
752            &store,
753            &mut event_sub,
754            4,
755            false,
756        )
757        .await;
758        gen_and_sample_block(
759            &mut handle,
760            &mut generator,
761            &store,
762            &mut event_sub,
763            8,
764            false,
765        )
766        .await;
767        gen_and_sample_block(
768            &mut handle,
769            &mut generator,
770            &store,
771            &mut event_sub,
772            16,
773            false,
774        )
775        .await;
776    }
777
778    #[async_test]
779    async fn sampling_timeout() {
780        let (mock, mut handle) = P2p::mocked();
781        let store = Arc::new(InMemoryStore::new());
782        let events = EventChannel::new();
783        let mut event_sub = events.subscribe();
784
785        let _daser = Daser::start(DaserArgs {
786            event_pub: events.publisher(),
787            p2p: Arc::new(mock),
788            store: store.clone(),
789            sampling_window: SAMPLING_WINDOW,
790            concurrency_limit: 1,
791            additional_headersub_concurrency: DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY,
792        })
793        .unwrap();
794
795        let mut generator = ExtendedHeaderGenerator::new();
796
797        handle.expect_no_cmd().await;
798        handle.announce_peer_connected();
799        handle.expect_no_cmd().await;
800
801        gen_and_sample_block(
802            &mut handle,
803            &mut generator,
804            &store,
805            &mut event_sub,
806            2,
807            false,
808        )
809        .await;
810        gen_and_sample_block(&mut handle, &mut generator, &store, &mut event_sub, 4, true).await;
811        gen_and_sample_block(
812            &mut handle,
813            &mut generator,
814            &store,
815            &mut event_sub,
816            8,
817            false,
818        )
819        .await;
820    }
821
822    #[async_test]
823    async fn backward_dasing() {
824        let (mock, mut handle) = P2p::mocked();
825        let store = Arc::new(InMemoryStore::new());
826        let events = EventChannel::new();
827
828        let _daser = Daser::start(DaserArgs {
829            event_pub: events.publisher(),
830            p2p: Arc::new(mock),
831            store: store.clone(),
832            sampling_window: SAMPLING_WINDOW,
833            concurrency_limit: 1,
834            additional_headersub_concurrency: DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY,
835        })
836        .unwrap();
837
838        let mut generator = ExtendedHeaderGenerator::new();
839
840        handle.expect_no_cmd().await;
841        handle.announce_peer_connected();
842        handle.expect_no_cmd().await;
843
844        let mut edses = Vec::new();
845        let mut headers = Vec::new();
846
847        for _ in 0..20 {
848            let eds = generate_dummy_eds(2, AppVersion::V2);
849            let dah = DataAvailabilityHeader::from_eds(&eds);
850            let header = generator.next_with_dah(dah);
851
852            edses.push(eds);
853            headers.push(header);
854        }
855
856        // Insert 5-10 block headers
857        store.insert(headers[4..=9].to_vec()).await.unwrap();
858
859        // Sample block 10
860        handle_get_shwap_cid(&mut handle, 10, &edses[9], false).await;
861
862        // Sample block 9
863        handle_get_shwap_cid(&mut handle, 9, &edses[8], false).await;
864
865        // To avoid race conditions we wait a bit for the block 8 to be scheduled
866        sleep(Duration::from_millis(10)).await;
867
868        // Insert 16-20 block headers
869        store.insert(headers[15..=19].to_vec()).await.unwrap();
870
871        // To avoid race conditions we wait a bit for the new head (block 20) to be scheduled
872        sleep(Duration::from_millis(10)).await;
873
874        // Now daser runs two concurrent data sampling: block 8 and block 20
875        handle_concurrent_get_shwap_cid(
876            &mut handle,
877            [(8, &edses[9], false), (20, &edses[19], false)],
878        )
879        .await;
880
881        // Sample and reject block 19
882        handle_get_shwap_cid(&mut handle, 19, &edses[18], true).await;
883
884        // Simulate disconnection
885        handle.announce_all_peers_disconnected();
886
887        // Daser may scheduled Block 18 already, so we need to reply to that requests.
888        // For the sake of the test we reply with a bitswap timeout.
889        while let Some(cmd) = handle.try_recv_cmd().await {
890            match cmd {
891                P2pCmd::GetShwapCid { respond_to, .. } => {
892                    let _ = respond_to.send(Err(P2pError::BitswapQueryTimeout));
893                }
894                cmd => panic!("Unexpected command: {cmd:?}"),
895            }
896        }
897
898        // We shouldn't have any other requests from daser because it's in connecting state.
899        handle.expect_no_cmd().await;
900
901        // Simulate that a peer connected
902        handle.announce_peer_connected();
903
904        // Because of disconnection and previous rejection of block 19, daser will resample it
905        handle_get_shwap_cid(&mut handle, 19, &edses[18], false).await;
906
907        // Sample block 16 until 18
908        for height in (16..=18).rev() {
909            let idx = height as usize - 1;
910            handle_get_shwap_cid(&mut handle, height, &edses[idx], false).await;
911        }
912
913        // Sample block 5 until 7
914        for height in (5..=7).rev() {
915            let idx = height as usize - 1;
916            handle_get_shwap_cid(&mut handle, height, &edses[idx], false).await;
917        }
918
919        handle.expect_no_cmd().await;
920
921        // Push block 21 in the store
922        let eds = generate_dummy_eds(2, AppVersion::V2);
923        let dah = DataAvailabilityHeader::from_eds(&eds);
924        let header = generator.next_with_dah(dah);
925        store.insert(header).await.unwrap();
926
927        // Sample block 21
928        handle_get_shwap_cid(&mut handle, 21, &eds, false).await;
929
930        handle.expect_no_cmd().await;
931    }
932
933    #[async_test]
934    async fn concurrency_limits() {
935        let (mock, mut handle) = P2p::mocked();
936        let store = Arc::new(InMemoryStore::new());
937        let events = EventChannel::new();
938
939        // Concurrency limit
940        let concurrency_limit = 10;
941        // Additional concurrency limit for heads.
942        // In other words concurrency limit becames 15.
943        let additional_headersub_concurrency = 5;
944        // Default number of shares that ExtendedHeaderGenerator
945        // generates per block.
946        let shares_per_block = 4;
947
948        let mut generator = ExtendedHeaderGenerator::new();
949        store
950            .insert(generator.next_many_verified(30))
951            .await
952            .unwrap();
953
954        let _daser = Daser::start(DaserArgs {
955            event_pub: events.publisher(),
956            p2p: Arc::new(mock),
957            store: store.clone(),
958            sampling_window: SAMPLING_WINDOW,
959            concurrency_limit,
960            additional_headersub_concurrency,
961        })
962        .unwrap();
963
964        handle.expect_no_cmd().await;
965        handle.announce_peer_connected();
966
967        let mut hold_respond_channels = Vec::new();
968
969        for _ in 0..(concurrency_limit * shares_per_block) {
970            let (cid, respond_to) = handle.expect_get_shwap_cid().await;
971            hold_respond_channels.push((cid, respond_to));
972        }
973
974        // Concurrency limit reached
975        handle.expect_no_cmd().await;
976
977        // However a new head will be allowed because additional limit is applied
978        store.insert(generator.next_many_verified(2)).await.unwrap();
979
980        for _ in 0..shares_per_block {
981            let (cid, respond_to) = handle.expect_get_shwap_cid().await;
982            hold_respond_channels.push((cid, respond_to));
983        }
984        handle.expect_no_cmd().await;
985
986        // Now 11 blocks are ongoing. In order for Daser to schedule the next
987        // one, 2 blocks need to finish.
988        // We stop sampling for blocks 29 and 30 in order to simulate this.
989        stop_sampling_for(&mut hold_respond_channels, 29);
990        stop_sampling_for(&mut hold_respond_channels, 30);
991
992        // Now Daser will schedule the next block.
993        for _ in 0..shares_per_block {
994            let (cid, respond_to) = handle.expect_get_shwap_cid().await;
995            hold_respond_channels.push((cid, respond_to));
996        }
997
998        // And... concurrency limit is reached again.
999        handle.expect_no_cmd().await;
1000
1001        // Generate 5 more heads
1002        for _ in 0..additional_headersub_concurrency {
1003            store.insert(generator.next_many_verified(1)).await.unwrap();
1004            // Give some time for Daser to shedule it
1005            sleep(Duration::from_millis(10)).await;
1006        }
1007
1008        for _ in 0..(additional_headersub_concurrency * shares_per_block) {
1009            let (cid, respond_to) = handle.expect_get_shwap_cid().await;
1010            hold_respond_channels.push((cid, respond_to));
1011        }
1012
1013        // Concurrency limit for heads is reached
1014        handle.expect_no_cmd().await;
1015        store.insert(generator.next_many_verified(1)).await.unwrap();
1016        handle.expect_no_cmd().await;
1017
1018        // Now we stop 1 block and Daser will schedule the head
1019        // we generated above.
1020        stop_sampling_for(&mut hold_respond_channels, 28);
1021
1022        for _ in 0..shares_per_block {
1023            let (cid, respond_to) = handle.expect_get_shwap_cid().await;
1024            hold_respond_channels.push((cid, respond_to));
1025        }
1026
1027        // Concurrency limit for heads is reached again
1028        handle.expect_no_cmd().await;
1029        store.insert(generator.next_many_verified(1)).await.unwrap();
1030        handle.expect_no_cmd().await;
1031    }
1032
1033    #[async_test]
1034    async fn ratelimit() {
1035        let (mock, mut handle) = P2p::mocked();
1036        let store = Arc::new(InMemoryStore::new());
1037        let events = EventChannel::new();
1038        let mut event_sub = events.subscribe();
1039
1040        let daser = Daser::start(DaserArgs {
1041            event_pub: events.publisher(),
1042            p2p: Arc::new(mock),
1043            store: store.clone(),
1044            sampling_window: Duration::from_secs(60),
1045            concurrency_limit: 1,
1046            additional_headersub_concurrency: DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY,
1047        })
1048        .unwrap();
1049
1050        let mut generator = ExtendedHeaderGenerator::new();
1051
1052        let now = Time::now();
1053        let first_header_time = (now - Duration::from_secs(1024)).unwrap();
1054        generator.set_time(first_header_time, Duration::from_secs(1));
1055        store.insert(generator.next_many(990)).await.unwrap();
1056
1057        let mut edses = HashMap::new();
1058
1059        for height in 991..=1000 {
1060            let eds = generate_dummy_eds(2, AppVersion::V2);
1061            let dah = DataAvailabilityHeader::from_eds(&eds);
1062            let header = generator.next_with_dah(dah);
1063
1064            edses.insert(height, eds);
1065            store.insert(header).await.unwrap();
1066        }
1067
1068        // Assume that Pruner reports the following
1069        daser.update_highest_prunable_block(1000).await.unwrap();
1070        daser.update_number_of_prunable_blocks(1000).await.unwrap();
1071
1072        handle.expect_no_cmd().await;
1073        handle.announce_peer_connected();
1074
1075        // Blocks that are currently stored are ratelimited, so we shouldn't get any command.
1076        handle.expect_no_cmd().await;
1077
1078        // Any blocks above 1000 are not limited because they are not in prunable area
1079        gen_and_sample_block(
1080            &mut handle,
1081            &mut generator,
1082            &store,
1083            &mut event_sub,
1084            2,
1085            false,
1086        )
1087        .await;
1088        gen_and_sample_block(
1089            &mut handle,
1090            &mut generator,
1091            &store,
1092            &mut event_sub,
1093            2,
1094            false,
1095        )
1096        .await;
1097
1098        // Again back to ratelimited
1099        handle.expect_no_cmd().await;
1100
1101        // Now Pruner reports that number of prunable blocks is lower that the threshold
1102        daser
1103            .update_number_of_prunable_blocks(PRUNER_THRESHOLD - 1)
1104            .await
1105            .unwrap();
1106        sleep(Duration::from_millis(5)).await;
1107
1108        // Sample blocks that are in prunable area
1109        sample_block(
1110            &mut handle,
1111            &store,
1112            &mut event_sub,
1113            edses.get(&1000).unwrap(),
1114            1000,
1115            false,
1116        )
1117        .await;
1118        sample_block(
1119            &mut handle,
1120            &store,
1121            &mut event_sub,
1122            edses.get(&999).unwrap(),
1123            999,
1124            false,
1125        )
1126        .await;
1127
1128        // Now pruner reports prunable blocks reached the threshold again
1129        daser
1130            .update_number_of_prunable_blocks(PRUNER_THRESHOLD)
1131            .await
1132            .unwrap();
1133        sleep(Duration::from_millis(5)).await;
1134
1135        // But sampling of 998 block was already scheduled, so we need to handle it
1136        sample_block(
1137            &mut handle,
1138            &store,
1139            &mut event_sub,
1140            edses.get(&998).unwrap(),
1141            998,
1142            false,
1143        )
1144        .await;
1145
1146        // Daser is ratelimited again
1147        handle.expect_no_cmd().await;
1148    }
1149
1150    fn stop_sampling_for(
1151        responders: &mut Vec<(Cid, OneshotResultSender<Vec<u8>, P2pError>)>,
1152        height: u64,
1153    ) {
1154        let mut indexes = Vec::new();
1155
1156        for (idx, (cid, _)) in responders.iter().enumerate() {
1157            let sample_id: SampleId = cid.try_into().unwrap();
1158            if sample_id.block_height() == height {
1159                indexes.push(idx)
1160            }
1161        }
1162
1163        for idx in indexes.into_iter().rev() {
1164            let (_cid, respond_to) = responders.remove(idx);
1165            respond_to.send(Err(P2pError::BitswapQueryTimeout)).unwrap();
1166        }
1167    }
1168
1169    async fn gen_and_sample_block(
1170        handle: &mut MockP2pHandle,
1171        generator: &mut ExtendedHeaderGenerator,
1172        store: &InMemoryStore,
1173        event_sub: &mut EventSubscriber,
1174        square_width: usize,
1175        simulate_sampling_timeout: bool,
1176    ) {
1177        let eds = generate_dummy_eds(square_width, AppVersion::V2);
1178        let dah = DataAvailabilityHeader::from_eds(&eds);
1179        let header = generator.next_with_dah(dah);
1180        let height = header.height().value();
1181
1182        store.insert(header).await.unwrap();
1183
1184        sample_block(
1185            handle,
1186            store,
1187            event_sub,
1188            &eds,
1189            height,
1190            simulate_sampling_timeout,
1191        )
1192        .await;
1193
1194        // `gen_and_sample_block` is only used in cases where Daser doesn't have
1195        // anything else to schedule.
1196        handle.expect_no_cmd().await;
1197        assert!(event_sub.try_recv().is_err());
1198    }
1199
1200    async fn sample_block(
1201        handle: &mut MockP2pHandle,
1202        store: &InMemoryStore,
1203        event_sub: &mut EventSubscriber,
1204        eds: &ExtendedDataSquare,
1205        height: u64,
1206        simulate_sampling_timeout: bool,
1207    ) {
1208        let cids = handle_get_shwap_cid(handle, height, eds, simulate_sampling_timeout).await;
1209
1210        // Wait to be sampled
1211        sleep(Duration::from_millis(100)).await;
1212
1213        // Check if block was sampled or timed-out.
1214        let sampled_ranges = store.get_sampled_ranges().await.unwrap();
1215        assert_eq!(sampled_ranges.contains(height), !simulate_sampling_timeout);
1216
1217        // Check if CIDs we requested successfully made it in the store
1218        let mut sampling_metadata = store.get_sampling_metadata(height).await.unwrap().unwrap();
1219        sampling_metadata.cids.sort();
1220        assert_eq!(&sampling_metadata.cids, &cids);
1221
1222        // Check if we received `SamplingStarted` event
1223        let mut remaining_shares = match event_sub.try_recv().unwrap().event {
1224            NodeEvent::SamplingStarted {
1225                height: ev_height,
1226                square_width,
1227                shares,
1228            } => {
1229                assert_eq!(ev_height, height);
1230                assert_eq!(square_width, eds.square_width());
1231
1232                // Make sure the share list matches the CIDs we received
1233                let mut cids = shares
1234                    .iter()
1235                    .map(|(row, col)| sample_cid(*row, *col, height).unwrap())
1236                    .collect::<Vec<_>>();
1237                cids.sort();
1238                assert_eq!(&sampling_metadata.cids, &cids);
1239
1240                shares.into_iter().collect::<HashSet<_>>()
1241            }
1242            ev => panic!("Unexpected event: {ev}"),
1243        };
1244
1245        // Check if we received `ShareSamplingResult` for each share
1246        for i in 1..=remaining_shares.len() {
1247            match event_sub.try_recv().unwrap().event {
1248                NodeEvent::ShareSamplingResult {
1249                    height: ev_height,
1250                    square_width,
1251                    row,
1252                    column,
1253                    timed_out,
1254                } => {
1255                    assert_eq!(ev_height, height);
1256                    assert_eq!(square_width, eds.square_width());
1257                    assert_eq!(
1258                        timed_out,
1259                        simulate_sampling_timeout && i == REQ_TIMEOUT_SHARE_NUM
1260                    );
1261                    // Make sure it is in the list and remove it
1262                    assert!(remaining_shares.remove(&(row, column)));
1263                }
1264                ev => panic!("Unexpected event: {ev}"),
1265            }
1266        }
1267
1268        assert!(remaining_shares.is_empty());
1269
1270        // Check if we received `SamplingResult` for the block
1271        match event_sub.try_recv().unwrap().event {
1272            NodeEvent::SamplingResult {
1273                height: ev_height,
1274                timed_out,
1275                took,
1276            } => {
1277                assert_eq!(ev_height, height);
1278                assert_eq!(timed_out, simulate_sampling_timeout);
1279                assert_ne!(took, Duration::default());
1280            }
1281            ev => panic!("Unexpected event: {ev}"),
1282        }
1283    }
1284
1285    /// Responds to get_shwap_cid and returns all CIDs that were requested
1286    async fn handle_concurrent_get_shwap_cid<const N: usize>(
1287        handle: &mut MockP2pHandle,
1288        handling_args: [(u64, &ExtendedDataSquare, bool); N],
1289    ) -> Vec<Cid> {
1290        struct Info<'a> {
1291            eds: &'a ExtendedDataSquare,
1292            simulate_sampling_timeout: bool,
1293            needed_samples: usize,
1294            requests_count: usize,
1295        }
1296
1297        let mut infos = handling_args
1298            .into_iter()
1299            .map(|(height, eds, simulate_sampling_timeout)| {
1300                let square_width = eds.square_width() as usize;
1301                let needed_samples = (square_width * square_width).min(MAX_SAMPLES_NEEDED);
1302
1303                (
1304                    height,
1305                    Info {
1306                        eds,
1307                        simulate_sampling_timeout,
1308                        needed_samples,
1309                        requests_count: 0,
1310                    },
1311                )
1312            })
1313            .collect::<HashMap<_, _>>();
1314
1315        let needed_samples_sum = infos.values().map(|info| info.needed_samples).sum();
1316        let mut cids = Vec::with_capacity(needed_samples_sum);
1317
1318        for _ in 0..needed_samples_sum {
1319            let (cid, respond_to) = handle.expect_get_shwap_cid().await;
1320            cids.push(cid);
1321
1322            let sample_id: SampleId = cid.try_into().unwrap();
1323            let info = infos
1324                .get_mut(&sample_id.block_height())
1325                .unwrap_or_else(|| panic!("Unexpected height: {}", sample_id.block_height()));
1326
1327            info.requests_count += 1;
1328
1329            // Simulate sampling timeout
1330            if info.simulate_sampling_timeout && info.requests_count == REQ_TIMEOUT_SHARE_NUM {
1331                respond_to.send(Err(P2pError::BitswapQueryTimeout)).unwrap();
1332                continue;
1333            }
1334
1335            let sample = gen_sample_of_cid(sample_id, info.eds).await;
1336            respond_to.send(Ok(sample)).unwrap();
1337        }
1338
1339        cids.sort();
1340        cids
1341    }
1342
1343    /// Responds to get_shwap_cid and returns all CIDs that were requested
1344    async fn handle_get_shwap_cid(
1345        handle: &mut MockP2pHandle,
1346        height: u64,
1347        eds: &ExtendedDataSquare,
1348        simulate_sampling_timeout: bool,
1349    ) -> Vec<Cid> {
1350        handle_concurrent_get_shwap_cid(handle, [(height, eds, simulate_sampling_timeout)]).await
1351    }
1352
1353    async fn gen_sample_of_cid(sample_id: SampleId, eds: &ExtendedDataSquare) -> Vec<u8> {
1354        let sample = Sample::new(
1355            sample_id.row_index(),
1356            sample_id.column_index(),
1357            AxisType::Row,
1358            eds,
1359        )
1360        .unwrap();
1361
1362        let mut container = BytesMut::new();
1363        sample.encode(&mut container);
1364
1365        let block = Block {
1366            cid: convert_cid(&sample_id.into()).unwrap().to_bytes(),
1367            container: container.to_vec(),
1368        };
1369
1370        block.encode_to_vec()
1371    }
1372}