lumina_node/
daser.rs

1//! Component responsible for data availability sampling of the already synchronized block
2//! headers announced on the Celestia network.
3
4use std::collections::HashSet;
5use std::sync::Arc;
6use std::time::Duration;
7
8use futures::future::BoxFuture;
9use futures::stream::FuturesUnordered;
10use futures::{FutureExt, StreamExt};
11use lumina_utils::executor::{JoinHandle, spawn};
12use lumina_utils::time::{Instant, Interval};
13use rand::Rng;
14use tendermint::Time;
15use tokio::select;
16use tokio::sync::{mpsc, oneshot};
17use tokio_util::sync::CancellationToken;
18use tracing::{debug, error, info, instrument, warn};
19
20use crate::events::{EventPublisher, NodeEvent};
21use crate::p2p::shwap::sample_cid;
22use crate::p2p::{P2p, P2pError};
23use crate::store::{BlockRanges, Store, StoreError};
24use crate::utils::{OneshotSenderExt, TimeExt};
25
26const MAX_SAMPLES_NEEDED: usize = 16;
27const GET_SAMPLE_MIN_TIMEOUT: Duration = Duration::from_secs(10);
28const PRUNER_THRESHOLD: u64 = 512;
29
30pub(crate) const DEFAULT_CONCURENCY_LIMIT: usize = 3;
31pub(crate) const DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY: usize = 5;
32
33type Result<T, E = DaserError> = std::result::Result<T, E>;
34
35/// Representation of all the errors that can occur in `Daser` component.
36#[derive(Debug, thiserror::Error)]
37pub enum DaserError {
38    /// An error propagated from the `P2p` component.
39    #[error("P2p: {0}")]
40    P2p(#[from] P2pError),
41
42    /// An error propagated from the [`Store`] component.
43    #[error("Store: {0}")]
44    Store(#[from] StoreError),
45
46    /// The worker has died.
47    #[error("Worker died")]
48    WorkerDied,
49
50    /// Channel closed unexpectedly.
51    #[error("Channel closed unexpectedly")]
52    ChannelClosedUnexpectedly,
53}
54
55impl From<oneshot::error::RecvError> for DaserError {
56    fn from(_value: oneshot::error::RecvError) -> Self {
57        DaserError::ChannelClosedUnexpectedly
58    }
59}
60
61/// Component responsible for data availability sampling of blocks from the network.
62pub(crate) struct Daser {
63    cmd_tx: mpsc::Sender<DaserCmd>,
64    cancellation_token: CancellationToken,
65    join_handle: JoinHandle,
66}
67
68/// Arguments used to configure the [`Daser`].
69pub(crate) struct DaserArgs<S>
70where
71    S: Store,
72{
73    /// Handler for the peer to peer messaging.
74    pub(crate) p2p: Arc<P2p>,
75    /// Headers storage.
76    pub(crate) store: Arc<S>,
77    /// Event publisher.
78    pub(crate) event_pub: EventPublisher,
79    /// Size of the sampling window.
80    pub(crate) sampling_window: Duration,
81    /// How many blocks can be data sampled at the same time.
82    pub(crate) concurrency_limit: usize,
83    /// How many additional blocks can be data sampled if they are from HeaderSub.
84    pub(crate) additional_headersub_concurrency: usize,
85}
86
87#[derive(Debug)]
88pub(crate) enum DaserCmd {
89    UpdateHighestPrunableHeight {
90        value: u64,
91    },
92
93    UpdateNumberOfPrunableBlocks {
94        value: u64,
95    },
96
97    /// Used by Pruner to tell Daser about a block that is going to be pruned.
98    /// Daser then replies with `true` if Pruner is allowed to do it.
99    ///
100    /// This is needed to avoid following race condition:
101    ///
102    /// We have `Store` very tightly integrated with `beetswap::Multihasher`
103    /// and when Daser starts data sampling the header of that block must be
104    /// in the `Store` until the data sampling is finished. This can be fixed
105    /// only if we decouple `Store` from `beetswap::Multihasher`.
106    ///
107    /// However, even if we fix the above, a second problem arise: When Pruner
108    /// removes the header and samples of an ongoing data sampling, how are we
109    /// going to handle the incoming CIDs? We need somehow make sure that Pruner
110    /// will remove them after sampling is finished.
111    ///
112    /// After the above issues are fixed, this can be removed.
113    WantToPrune {
114        height: u64,
115        respond_to: oneshot::Sender<bool>,
116    },
117}
118
119impl Daser {
120    /// Create and start the [`Daser`].
121    pub(crate) fn start<S>(args: DaserArgs<S>) -> Result<Self>
122    where
123        S: Store + 'static,
124    {
125        let cancellation_token = CancellationToken::new();
126        let event_pub = args.event_pub.clone();
127        let (cmd_tx, cmd_rx) = mpsc::channel(16);
128        let mut worker = Worker::new(args, cancellation_token.child_token(), cmd_rx)?;
129
130        let join_handle = spawn(async move {
131            if let Err(e) = worker.run().await {
132                error!("Daser stopped because of a fatal error: {e}");
133
134                event_pub.send(NodeEvent::FatalDaserError {
135                    error: e.to_string(),
136                });
137            }
138        });
139
140        Ok(Daser {
141            cmd_tx,
142            cancellation_token,
143            join_handle,
144        })
145    }
146
147    #[cfg(test)]
148    pub(crate) fn mocked() -> (Self, crate::test_utils::MockDaserHandle) {
149        let (cmd_tx, cmd_rx) = mpsc::channel(16);
150        let cancellation_token = CancellationToken::new();
151
152        // Just a fake join_handle
153        let join_handle = spawn(async {});
154
155        let daser = Daser {
156            cmd_tx,
157            cancellation_token,
158            join_handle,
159        };
160
161        let mock_handle = crate::test_utils::MockDaserHandle { cmd_rx };
162
163        (daser, mock_handle)
164    }
165
166    /// Stop the worker.
167    pub(crate) fn stop(&self) {
168        // Signal the Worker to stop.
169        self.cancellation_token.cancel();
170    }
171
172    /// Wait until worker is completely stopped.
173    pub(crate) async fn join(&self) {
174        self.join_handle.join().await;
175    }
176
177    async fn send_command(&self, cmd: DaserCmd) -> Result<()> {
178        self.cmd_tx
179            .send(cmd)
180            .await
181            .map_err(|_| DaserError::WorkerDied)
182    }
183
184    pub(crate) async fn want_to_prune(&self, height: u64) -> Result<bool> {
185        let (tx, rx) = oneshot::channel();
186
187        self.send_command(DaserCmd::WantToPrune {
188            height,
189            respond_to: tx,
190        })
191        .await?;
192
193        Ok(rx.await?)
194    }
195
196    pub(crate) async fn update_highest_prunable_block(&self, value: u64) -> Result<()> {
197        self.send_command(DaserCmd::UpdateHighestPrunableHeight { value })
198            .await
199    }
200
201    pub(crate) async fn update_number_of_prunable_blocks(&self, value: u64) -> Result<()> {
202        self.send_command(DaserCmd::UpdateNumberOfPrunableBlocks { value })
203            .await
204    }
205}
206
207impl Drop for Daser {
208    fn drop(&mut self) {
209        self.stop();
210    }
211}
212
213struct Worker<S>
214where
215    S: Store + 'static,
216{
217    cmd_rx: mpsc::Receiver<DaserCmd>,
218    cancellation_token: CancellationToken,
219    event_pub: EventPublisher,
220    p2p: Arc<P2p>,
221    store: Arc<S>,
222    max_samples_needed: usize,
223    sampling_futs: FuturesUnordered<BoxFuture<'static, Result<(u64, bool)>>>,
224    queue: BlockRanges,
225    timed_out: BlockRanges,
226    ongoing: BlockRanges,
227    will_be_pruned: BlockRanges,
228    sampling_window: Duration,
229    concurrency_limit: usize,
230    additional_headersub_concurency: usize,
231    head_height: Option<u64>,
232    highest_prunable_height: Option<u64>,
233    num_of_prunable_blocks: u64,
234}
235
236impl<S> Worker<S>
237where
238    S: Store,
239{
240    fn new(
241        args: DaserArgs<S>,
242        cancellation_token: CancellationToken,
243        cmd_rx: mpsc::Receiver<DaserCmd>,
244    ) -> Result<Worker<S>> {
245        Ok(Worker {
246            cmd_rx,
247            cancellation_token,
248            event_pub: args.event_pub,
249            p2p: args.p2p,
250            store: args.store,
251            max_samples_needed: MAX_SAMPLES_NEEDED,
252            sampling_futs: FuturesUnordered::new(),
253            queue: BlockRanges::default(),
254            timed_out: BlockRanges::default(),
255            ongoing: BlockRanges::default(),
256            will_be_pruned: BlockRanges::default(),
257            sampling_window: args.sampling_window,
258            concurrency_limit: args.concurrency_limit,
259            additional_headersub_concurency: args.additional_headersub_concurrency,
260            head_height: None,
261            highest_prunable_height: None,
262            num_of_prunable_blocks: 0,
263        })
264    }
265
266    async fn run(&mut self) -> Result<()> {
267        loop {
268            if self.cancellation_token.is_cancelled() {
269                break;
270            }
271
272            self.connecting_event_loop().await;
273
274            if self.cancellation_token.is_cancelled() {
275                break;
276            }
277
278            self.connected_event_loop().await?;
279        }
280
281        debug!("Daser stopped");
282        Ok(())
283    }
284
285    async fn connecting_event_loop(&mut self) {
286        debug!("Entering connecting_event_loop");
287
288        let mut peer_tracker_info_watcher = self.p2p.peer_tracker_info_watcher();
289
290        // Check if connection status changed before watcher was created
291        if peer_tracker_info_watcher.borrow().num_connected_peers > 0 {
292            return;
293        }
294
295        loop {
296            select! {
297                _ = self.cancellation_token.cancelled() => {
298                    break;
299                }
300                _ = peer_tracker_info_watcher.changed() => {
301                    if peer_tracker_info_watcher.borrow().num_connected_peers > 0 {
302                        break;
303                    }
304                }
305                Some(cmd) = self.cmd_rx.recv() => self.on_cmd(cmd).await,
306            }
307        }
308    }
309
310    async fn connected_event_loop(&mut self) -> Result<()> {
311        debug!("Entering connected_event_loop");
312
313        let mut report_interval = Interval::new(Duration::from_secs(60));
314        let mut peer_tracker_info_watcher = self.p2p.peer_tracker_info_watcher();
315
316        // Check if connection status changed before the watcher was created
317        if peer_tracker_info_watcher.borrow().num_connected_peers == 0 {
318            warn!("All peers disconnected");
319            return Ok(());
320        }
321
322        // Workaround because `wait_new_head` is not cancel-safe.
323        //
324        // TODO: Only Syncer add new headers to the store, so ideally
325        // Syncer should inform Daser that new headers were added.
326        let store = self.store.clone();
327        let mut wait_new_head = store.wait_new_head();
328
329        self.update_queue().await?;
330
331        let mut first_report = true;
332
333        loop {
334            // Start as many data sampling we are allowed.
335            while self.schedule_next_sample_block().await? {}
336
337            if first_report {
338                self.report().await?;
339                first_report = false;
340            }
341
342            select! {
343                _ = self.cancellation_token.cancelled() => {
344                    break;
345                }
346                _ = peer_tracker_info_watcher.changed() => {
347                    if peer_tracker_info_watcher.borrow().num_connected_peers == 0 {
348                        warn!("All peers disconnected");
349                        break;
350                    }
351                }
352                _ = report_interval.tick() => self.report().await?,
353                Some(cmd) = self.cmd_rx.recv() => self.on_cmd(cmd).await,
354                Some(res) = self.sampling_futs.next() => {
355                    // Beetswap only returns fatal errors that are not related
356                    // to P2P nor networking.
357                    let (height, timed_out) = res?;
358
359                    if timed_out {
360                        self.timed_out.insert_relaxed(height..=height).expect("invalid height");
361                    } else {
362                        self.store.mark_as_sampled(height).await?;
363                    }
364
365                    self.ongoing.remove_relaxed(height..=height).expect("invalid height");
366                },
367                _ = &mut wait_new_head => {
368                    wait_new_head = store.wait_new_head();
369                    self.update_queue().await?;
370                }
371            }
372        }
373
374        self.sampling_futs.clear();
375        self.queue = BlockRanges::default();
376        self.ongoing = BlockRanges::default();
377        self.timed_out = BlockRanges::default();
378        self.head_height = None;
379
380        Ok(())
381    }
382
383    #[instrument(skip_all)]
384    async fn report(&mut self) -> Result<()> {
385        let sampled = self.store.get_sampled_ranges().await?;
386
387        info!(
388            "data sampling: stored and sampled blocks: {}, ongoing blocks: {}",
389            sampled, &self.ongoing,
390        );
391
392        Ok(())
393    }
394
395    async fn on_cmd(&mut self, cmd: DaserCmd) {
396        match cmd {
397            DaserCmd::WantToPrune { height, respond_to } => {
398                let res = self.on_want_to_prune(height).await;
399                respond_to.maybe_send(res);
400            }
401            DaserCmd::UpdateHighestPrunableHeight { value } => {
402                self.highest_prunable_height = Some(value);
403            }
404            DaserCmd::UpdateNumberOfPrunableBlocks { value } => {
405                self.num_of_prunable_blocks = value;
406            }
407        }
408    }
409
410    async fn on_want_to_prune(&mut self, height: u64) -> bool {
411        // Pruner should not remove headers that are related to an ongoing sampling.
412        if self.ongoing.contains(height) {
413            return false;
414        }
415
416        // Header will be pruned, so we remove it from the queue to avoid race conditions.
417        self.queue
418            .remove_relaxed(height..=height)
419            .expect("invalid height");
420        // We also make sure `populate_queue` will not put it back.
421        self.will_be_pruned
422            .insert_relaxed(height..=height)
423            .expect("invalid height");
424
425        true
426    }
427
428    async fn schedule_next_sample_block(&mut self) -> Result<bool> {
429        // Schedule the most recent unsampled block.
430        let header = loop {
431            let Some(height) = self.queue.pop_head() else {
432                return Ok(false);
433            };
434
435            let concurrency_limit = if height <= self.highest_prunable_height.unwrap_or(0)
436                && self.num_of_prunable_blocks >= PRUNER_THRESHOLD
437            {
438                // If a block is in the prunable area and Pruner has a lot of blocks
439                // to prune, then we pause sampling.
440                0
441            } else if height == self.head_height.unwrap_or(0) {
442                // For head we allow additional concurrency
443                self.concurrency_limit + self.additional_headersub_concurency
444            } else {
445                self.concurrency_limit
446            };
447
448            if self.sampling_futs.len() >= concurrency_limit {
449                // If concurrency limit is reached, then we pause sampling
450                // and we put back the height we previously popped.
451                self.queue
452                    .insert_relaxed(height..=height)
453                    .expect("invalid height");
454                return Ok(false);
455            }
456
457            match self.store.get_by_height(height).await {
458                Ok(header) => break header,
459                Err(StoreError::NotFound) => {
460                    // Height was pruned and our queue is inconsistent.
461                    // Repopulate queue and try again.
462                    self.update_queue().await?;
463                }
464                Err(e) => return Err(e.into()),
465            }
466        };
467
468        let height = header.height();
469        let square_width = header.square_width();
470
471        // Make sure that the block is still in the sampling window.
472        if !self.in_sampling_window(header.time()) {
473            // As soon as we reach a block that is not in the sampling
474            // window, it means the rest wouldn't be either.
475            self.queue
476                .remove_relaxed(1..=height)
477                .expect("invalid height");
478            self.timed_out
479                .insert_relaxed(1..=height)
480                .expect("invalid height");
481            return Ok(false);
482        }
483
484        // Select random shares to be sampled
485        let share_indexes = random_indexes(square_width, self.max_samples_needed);
486
487        // Update the CID list before we start sampling, otherwise it's possible for us
488        // to leak CIDs causing associated blocks to never get cleaned from blockstore.
489        let cids = share_indexes
490            .iter()
491            .map(|(row, col)| sample_cid(*row, *col, height))
492            .collect::<Result<Vec<_>, _>>()?;
493        self.store.update_sampling_metadata(height, cids).await?;
494
495        let p2p = self.p2p.clone();
496        let event_pub = self.event_pub.clone();
497        let sampling_window = self.sampling_window;
498
499        // Schedule retrieval of the CIDs. This will be run later on in the `select!` loop.
500        let fut = async move {
501            let now = Instant::now();
502
503            event_pub.send(NodeEvent::SamplingStarted {
504                height,
505                square_width,
506                shares: share_indexes.iter().copied().collect(),
507            });
508
509            // We set the timeout high enough until block goes out of sampling window.
510            let timeout = calc_timeout(header.time(), Time::now(), sampling_window);
511
512            // Initialize all futures
513            let mut futs = share_indexes
514                .into_iter()
515                .map(|(row, col)| {
516                    let p2p = p2p.clone();
517
518                    async move {
519                        let res = p2p.get_sample(row, col, height, Some(timeout)).await;
520                        (row, col, res)
521                    }
522                })
523                .collect::<FuturesUnordered<_>>();
524
525            let mut sampling_timed_out = false;
526
527            // Run futures to completion
528            while let Some((row, column, res)) = futs.next().await {
529                let timed_out = match res {
530                    Ok(_) => false,
531                    // Validation is done at Bitswap level, through `ShwapMultihasher`.
532                    // If the sample is not valid, it will never be delivered to us
533                    // as the data of the CID. Because of that, the only signal
534                    // that data sampling verification failed is query timing out.
535                    Err(P2pError::RequestTimedOut) => true,
536                    Err(e) => return Err(e.into()),
537                };
538
539                if timed_out {
540                    sampling_timed_out = true;
541                }
542
543                event_pub.send(NodeEvent::ShareSamplingResult {
544                    height,
545                    square_width,
546                    row,
547                    column,
548                    timed_out,
549                });
550            }
551
552            event_pub.send(NodeEvent::SamplingResult {
553                height,
554                timed_out: sampling_timed_out,
555                took: now.elapsed(),
556            });
557
558            Ok((height, sampling_timed_out))
559        }
560        .boxed();
561
562        self.sampling_futs.push(fut);
563        self.ongoing
564            .insert_relaxed(height..=height)
565            .expect("invalid height");
566
567        Ok(true)
568    }
569
570    /// Add to the queue the blocks that need to be sampled.
571    async fn update_queue(&mut self) -> Result<()> {
572        let stored = self.store.get_stored_header_ranges().await?;
573        let sampled = self.store.get_sampled_ranges().await?;
574
575        self.head_height = stored.head();
576        self.queue = stored - &sampled - &self.timed_out - &self.ongoing - &self.will_be_pruned;
577
578        Ok(())
579    }
580
581    /// Returns true if `time` is within the sampling window.
582    fn in_sampling_window(&self, time: Time) -> bool {
583        let now = Time::now();
584
585        // Header is from the future! Thus, within sampling window.
586        if now < time {
587            return true;
588        }
589
590        let Ok(age) = now.duration_since(time) else {
591            return false;
592        };
593
594        age <= self.sampling_window
595    }
596}
597
598fn calc_timeout(header_time: Time, now: Time, sampling_window: Duration) -> Duration {
599    let sampling_window_end = now.saturating_sub(sampling_window);
600
601    header_time
602        .duration_since(sampling_window_end)
603        .unwrap_or(GET_SAMPLE_MIN_TIMEOUT)
604        .max(GET_SAMPLE_MIN_TIMEOUT)
605}
606
607/// Returns unique and random indexes that will be used for sampling.
608fn random_indexes(square_width: u16, max_samples_needed: usize) -> HashSet<(u16, u16)> {
609    let samples_in_block = usize::from(square_width).pow(2);
610
611    // If block size is smaller than `max_samples_needed`, we are going
612    // to sample the whole block. Randomness is not needed for this.
613    if samples_in_block <= max_samples_needed {
614        return (0..square_width)
615            .flat_map(|row| (0..square_width).map(move |col| (row, col)))
616            .collect();
617    }
618
619    let mut indexes = HashSet::with_capacity(max_samples_needed);
620    let mut rng = rand::thread_rng();
621
622    while indexes.len() < max_samples_needed {
623        let row = rng.r#gen::<u16>() % square_width;
624        let col = rng.r#gen::<u16>() % square_width;
625        indexes.insert((row, col));
626    }
627
628    indexes
629}
630
631#[cfg(test)]
632mod tests {
633    use super::*;
634    use crate::events::{EventChannel, EventSubscriber};
635    use crate::node::SAMPLING_WINDOW;
636    use crate::p2p::P2pCmd;
637    use crate::p2p::shwap::convert_cid;
638    use crate::store::InMemoryStore;
639    use crate::test_utils::{ExtendedHeaderGeneratorExt, MockP2pHandle};
640    use crate::utils::OneshotResultSender;
641    use bytes::BytesMut;
642    use celestia_proto::bitswap::Block;
643    use celestia_types::consts::appconsts::AppVersion;
644    use celestia_types::sample::{Sample, SampleId};
645    use celestia_types::test_utils::{ExtendedHeaderGenerator, generate_dummy_eds};
646    use celestia_types::{AxisType, DataAvailabilityHeader, ExtendedDataSquare};
647    use cid::Cid;
648    use lumina_utils::test_utils::async_test;
649    use lumina_utils::time::sleep;
650    use prost::Message;
651    use std::collections::HashMap;
652    use std::time::Duration;
653
654    // Request number for which tests will simulate sampling timeout
655    //
656    // NOTE: The smallest block has 4 shares, so a 2nd request will always happen.
657    const REQ_TIMEOUT_SHARE_NUM: usize = 2;
658
659    #[async_test]
660    async fn check_calc_timeout() {
661        let now = Time::now();
662        let sampling_window = Duration::from_secs(60);
663
664        let header_time = now;
665        assert_eq!(
666            calc_timeout(header_time, now, sampling_window),
667            Duration::from_secs(60)
668        );
669
670        let header_time = now.checked_sub(Duration::from_secs(1)).unwrap();
671        assert_eq!(
672            calc_timeout(header_time, now, sampling_window),
673            Duration::from_secs(59)
674        );
675
676        let header_time = now.checked_sub(Duration::from_secs(49)).unwrap();
677        assert_eq!(
678            calc_timeout(header_time, now, sampling_window),
679            Duration::from_secs(11)
680        );
681
682        let header_time = now.checked_sub(Duration::from_secs(50)).unwrap();
683        assert_eq!(
684            calc_timeout(header_time, now, sampling_window),
685            Duration::from_secs(10)
686        );
687
688        // minimum timeout edge 1
689        let header_time = now.checked_sub(Duration::from_secs(51)).unwrap();
690        assert_eq!(
691            calc_timeout(header_time, now, sampling_window),
692            GET_SAMPLE_MIN_TIMEOUT
693        );
694
695        // minimum timeout edge 2
696        let header_time = now.checked_sub(Duration::from_secs(60)).unwrap();
697        assert_eq!(
698            calc_timeout(header_time, now, sampling_window),
699            GET_SAMPLE_MIN_TIMEOUT
700        );
701
702        // header outside of the sampling window
703        let header_time = now.checked_sub(Duration::from_secs(61)).unwrap();
704        assert_eq!(
705            calc_timeout(header_time, now, sampling_window),
706            GET_SAMPLE_MIN_TIMEOUT
707        );
708
709        // header from the "future"
710        let header_time = now.checked_add(Duration::from_secs(1)).unwrap();
711        assert_eq!(
712            calc_timeout(header_time, now, sampling_window),
713            Duration::from_secs(61)
714        );
715    }
716
717    #[async_test]
718    async fn received_valid_samples() {
719        let (mock, mut handle) = P2p::mocked();
720        let store = Arc::new(InMemoryStore::new());
721        let events = EventChannel::new();
722        let mut event_sub = events.subscribe();
723
724        let _daser = Daser::start(DaserArgs {
725            event_pub: events.publisher(),
726            p2p: Arc::new(mock),
727            store: store.clone(),
728            sampling_window: SAMPLING_WINDOW,
729            concurrency_limit: 1,
730            additional_headersub_concurrency: DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY,
731        })
732        .unwrap();
733
734        let mut generator = ExtendedHeaderGenerator::new();
735
736        handle.expect_no_cmd().await;
737        handle.announce_peer_connected();
738        handle.expect_no_cmd().await;
739
740        gen_and_sample_block(
741            &mut handle,
742            &mut generator,
743            &store,
744            &mut event_sub,
745            2,
746            false,
747        )
748        .await;
749        gen_and_sample_block(
750            &mut handle,
751            &mut generator,
752            &store,
753            &mut event_sub,
754            4,
755            false,
756        )
757        .await;
758        gen_and_sample_block(
759            &mut handle,
760            &mut generator,
761            &store,
762            &mut event_sub,
763            8,
764            false,
765        )
766        .await;
767        gen_and_sample_block(
768            &mut handle,
769            &mut generator,
770            &store,
771            &mut event_sub,
772            16,
773            false,
774        )
775        .await;
776    }
777
778    #[async_test]
779    async fn sampling_timeout() {
780        let (mock, mut handle) = P2p::mocked();
781        let store = Arc::new(InMemoryStore::new());
782        let events = EventChannel::new();
783        let mut event_sub = events.subscribe();
784
785        let _daser = Daser::start(DaserArgs {
786            event_pub: events.publisher(),
787            p2p: Arc::new(mock),
788            store: store.clone(),
789            sampling_window: SAMPLING_WINDOW,
790            concurrency_limit: 1,
791            additional_headersub_concurrency: DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY,
792        })
793        .unwrap();
794
795        let mut generator = ExtendedHeaderGenerator::new();
796
797        handle.expect_no_cmd().await;
798        handle.announce_peer_connected();
799        handle.expect_no_cmd().await;
800
801        gen_and_sample_block(
802            &mut handle,
803            &mut generator,
804            &store,
805            &mut event_sub,
806            2,
807            false,
808        )
809        .await;
810        gen_and_sample_block(&mut handle, &mut generator, &store, &mut event_sub, 4, true).await;
811        gen_and_sample_block(
812            &mut handle,
813            &mut generator,
814            &store,
815            &mut event_sub,
816            8,
817            false,
818        )
819        .await;
820    }
821
822    #[async_test]
823    async fn backward_dasing() {
824        let (mock, mut handle) = P2p::mocked();
825        let store = Arc::new(InMemoryStore::new());
826        let events = EventChannel::new();
827
828        let _daser = Daser::start(DaserArgs {
829            event_pub: events.publisher(),
830            p2p: Arc::new(mock),
831            store: store.clone(),
832            sampling_window: SAMPLING_WINDOW,
833            concurrency_limit: 1,
834            additional_headersub_concurrency: DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY,
835        })
836        .unwrap();
837
838        let mut generator = ExtendedHeaderGenerator::new();
839
840        handle.expect_no_cmd().await;
841        handle.announce_peer_connected();
842        handle.expect_no_cmd().await;
843
844        let mut edses = Vec::new();
845        let mut headers = Vec::new();
846
847        for _ in 0..20 {
848            let eds = generate_dummy_eds(2, AppVersion::V2);
849            let dah = DataAvailabilityHeader::from_eds(&eds);
850            let header = generator.next_with_dah(dah);
851
852            edses.push(eds);
853            headers.push(header);
854        }
855
856        // Insert 5-10 block headers
857        store.insert(headers[4..=9].to_vec()).await.unwrap();
858
859        // Sample block 10
860        handle_get_shwap_cid(&mut handle, 10, &edses[9], false).await;
861
862        // Sample block 9
863        handle_get_shwap_cid(&mut handle, 9, &edses[8], false).await;
864
865        // To avoid race conditions we wait a bit for the block 8 to be scheduled
866        sleep(Duration::from_millis(10)).await;
867
868        // Insert 16-20 block headers
869        store.insert(headers[15..=19].to_vec()).await.unwrap();
870
871        // To avoid race conditions we wait a bit for the new head (block 20) to be scheduled
872        sleep(Duration::from_millis(10)).await;
873
874        // Now daser runs two concurrent data sampling: block 8 and block 20
875        handle_concurrent_get_shwap_cid(
876            &mut handle,
877            [(8, &edses[9], false), (20, &edses[19], false)],
878        )
879        .await;
880
881        // Sample and reject block 19
882        handle_get_shwap_cid(&mut handle, 19, &edses[18], true).await;
883
884        // Simulate disconnection
885        handle.announce_all_peers_disconnected();
886
887        // Daser may scheduled Block 18 already, so we need to reply to that requests.
888        // For the sake of the test we reply with a bitswap timeout.
889        while let Some(cmd) = handle.try_recv_cmd().await {
890            match cmd {
891                P2pCmd::GetShwapCid { respond_to, .. } => {
892                    let _ = respond_to.send(Err(P2pError::RequestTimedOut));
893                }
894                cmd => panic!("Unexpected command: {cmd:?}"),
895            }
896        }
897
898        // We shouldn't have any other requests from daser because it's in connecting state.
899        handle.expect_no_cmd().await;
900
901        // Simulate that a peer connected
902        handle.announce_peer_connected();
903
904        // Because of disconnection and previous rejection of block 19, daser will resample it
905        handle_get_shwap_cid(&mut handle, 19, &edses[18], false).await;
906
907        // Sample block 16 until 18
908        for height in (16..=18).rev() {
909            let idx = height as usize - 1;
910            handle_get_shwap_cid(&mut handle, height, &edses[idx], false).await;
911        }
912
913        // Sample block 5 until 7
914        for height in (5..=7).rev() {
915            let idx = height as usize - 1;
916            handle_get_shwap_cid(&mut handle, height, &edses[idx], false).await;
917        }
918
919        handle.expect_no_cmd().await;
920
921        // Push block 21 in the store
922        let eds = generate_dummy_eds(2, AppVersion::V2);
923        let dah = DataAvailabilityHeader::from_eds(&eds);
924        let header = generator.next_with_dah(dah);
925        store.insert(header).await.unwrap();
926
927        // Sample block 21
928        handle_get_shwap_cid(&mut handle, 21, &eds, false).await;
929
930        handle.expect_no_cmd().await;
931    }
932
933    #[async_test]
934    async fn concurrency_limits() {
935        let (mock, mut handle) = P2p::mocked();
936        let store = Arc::new(InMemoryStore::new());
937        let events = EventChannel::new();
938
939        // Concurrency limit
940        let concurrency_limit = 10;
941        // Additional concurrency limit for heads.
942        // In other words concurrency limit becomes 15.
943        let additional_headersub_concurrency = 5;
944        // Default number of shares that ExtendedHeaderGenerator
945        // generates per block.
946        let shares_per_block = 4;
947
948        let mut generator = ExtendedHeaderGenerator::new();
949        store
950            .insert(generator.next_many_empty_verified(30))
951            .await
952            .unwrap();
953
954        let _daser = Daser::start(DaserArgs {
955            event_pub: events.publisher(),
956            p2p: Arc::new(mock),
957            store: store.clone(),
958            sampling_window: SAMPLING_WINDOW,
959            concurrency_limit,
960            additional_headersub_concurrency,
961        })
962        .unwrap();
963
964        handle.expect_no_cmd().await;
965        handle.announce_peer_connected();
966
967        let mut hold_respond_channels = Vec::new();
968
969        for _ in 0..(concurrency_limit * shares_per_block) {
970            let (cid, respond_to) = handle.expect_get_shwap_cid().await;
971            hold_respond_channels.push((cid, respond_to));
972        }
973
974        // Concurrency limit reached
975        handle.expect_no_cmd().await;
976
977        // However, a new head will be allowed because additional limit is applied
978        store
979            .insert(generator.next_many_empty_verified(2))
980            .await
981            .unwrap();
982
983        for _ in 0..shares_per_block {
984            let (cid, respond_to) = handle.expect_get_shwap_cid().await;
985            hold_respond_channels.push((cid, respond_to));
986        }
987        handle.expect_no_cmd().await;
988
989        // Now 11 blocks are ongoing. In order for Daser to schedule the next
990        // one, 2 blocks need to finish.
991        // We stop sampling for blocks 29 and 30 in order to simulate this.
992        stop_sampling_for(&mut hold_respond_channels, 29);
993        stop_sampling_for(&mut hold_respond_channels, 30);
994
995        // Now Daser will schedule the next block.
996        for _ in 0..shares_per_block {
997            let (cid, respond_to) = handle.expect_get_shwap_cid().await;
998            hold_respond_channels.push((cid, respond_to));
999        }
1000
1001        // And... concurrency limit is reached again.
1002        handle.expect_no_cmd().await;
1003
1004        // Generate 5 more heads
1005        for _ in 0..additional_headersub_concurrency {
1006            store
1007                .insert(generator.next_many_empty_verified(1))
1008                .await
1009                .unwrap();
1010            // Give some time for Daser to schedule it
1011            sleep(Duration::from_millis(10)).await;
1012        }
1013
1014        for _ in 0..(additional_headersub_concurrency * shares_per_block) {
1015            let (cid, respond_to) = handle.expect_get_shwap_cid().await;
1016            hold_respond_channels.push((cid, respond_to));
1017        }
1018
1019        // Concurrency limit for heads is reached
1020        handle.expect_no_cmd().await;
1021        store
1022            .insert(generator.next_many_empty_verified(1))
1023            .await
1024            .unwrap();
1025        handle.expect_no_cmd().await;
1026
1027        // Now we stop 1 block and Daser will schedule the head
1028        // we generated above.
1029        stop_sampling_for(&mut hold_respond_channels, 28);
1030
1031        for _ in 0..shares_per_block {
1032            let (cid, respond_to) = handle.expect_get_shwap_cid().await;
1033            hold_respond_channels.push((cid, respond_to));
1034        }
1035
1036        // Concurrency limit for heads is reached again
1037        handle.expect_no_cmd().await;
1038        store
1039            .insert(generator.next_many_empty_verified(1))
1040            .await
1041            .unwrap();
1042        handle.expect_no_cmd().await;
1043    }
1044
1045    #[async_test]
1046    async fn ratelimit() {
1047        let (mock, mut handle) = P2p::mocked();
1048        let store = Arc::new(InMemoryStore::new());
1049        let events = EventChannel::new();
1050        let mut event_sub = events.subscribe();
1051
1052        let daser = Daser::start(DaserArgs {
1053            event_pub: events.publisher(),
1054            p2p: Arc::new(mock),
1055            store: store.clone(),
1056            sampling_window: Duration::from_secs(60),
1057            concurrency_limit: 1,
1058            additional_headersub_concurrency: DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY,
1059        })
1060        .unwrap();
1061
1062        let mut generator = ExtendedHeaderGenerator::new();
1063
1064        let now = Time::now();
1065        let first_header_time = (now - Duration::from_secs(1024)).unwrap();
1066        generator.set_time(first_header_time, Duration::from_secs(1));
1067        store.insert(generator.next_many(990)).await.unwrap();
1068
1069        let mut edses = HashMap::new();
1070
1071        for height in 991..=1000 {
1072            let eds = generate_dummy_eds(2, AppVersion::V2);
1073            let dah = DataAvailabilityHeader::from_eds(&eds);
1074            let header = generator.next_with_dah(dah);
1075
1076            edses.insert(height, eds);
1077            store.insert(header).await.unwrap();
1078        }
1079
1080        // Assume that Pruner reports the following
1081        daser.update_highest_prunable_block(1000).await.unwrap();
1082        daser.update_number_of_prunable_blocks(1000).await.unwrap();
1083
1084        handle.expect_no_cmd().await;
1085        handle.announce_peer_connected();
1086
1087        // Blocks that are currently stored are rate-limited, so we shouldn't get any command.
1088        handle.expect_no_cmd().await;
1089
1090        // Any blocks above 1000 are not limited because they are not in prunable area
1091        gen_and_sample_block(
1092            &mut handle,
1093            &mut generator,
1094            &store,
1095            &mut event_sub,
1096            2,
1097            false,
1098        )
1099        .await;
1100        gen_and_sample_block(
1101            &mut handle,
1102            &mut generator,
1103            &store,
1104            &mut event_sub,
1105            2,
1106            false,
1107        )
1108        .await;
1109
1110        // Again back to rate-limited
1111        handle.expect_no_cmd().await;
1112
1113        // Now Pruner reports that number of prunable blocks is lower than the threshold
1114        daser
1115            .update_number_of_prunable_blocks(PRUNER_THRESHOLD - 1)
1116            .await
1117            .unwrap();
1118        sleep(Duration::from_millis(5)).await;
1119
1120        // Sample blocks that are in prunable area
1121        sample_block(
1122            &mut handle,
1123            &store,
1124            &mut event_sub,
1125            edses.get(&1000).unwrap(),
1126            1000,
1127            false,
1128        )
1129        .await;
1130        sample_block(
1131            &mut handle,
1132            &store,
1133            &mut event_sub,
1134            edses.get(&999).unwrap(),
1135            999,
1136            false,
1137        )
1138        .await;
1139
1140        // Now pruner reports prunable blocks reached the threshold again
1141        daser
1142            .update_number_of_prunable_blocks(PRUNER_THRESHOLD)
1143            .await
1144            .unwrap();
1145        sleep(Duration::from_millis(5)).await;
1146
1147        // But sampling of 998 block was already scheduled, so we need to handle it
1148        sample_block(
1149            &mut handle,
1150            &store,
1151            &mut event_sub,
1152            edses.get(&998).unwrap(),
1153            998,
1154            false,
1155        )
1156        .await;
1157
1158        // Daser is rate-limited again
1159        handle.expect_no_cmd().await;
1160    }
1161
1162    fn stop_sampling_for(
1163        responders: &mut Vec<(Cid, OneshotResultSender<Vec<u8>, P2pError>)>,
1164        height: u64,
1165    ) {
1166        let mut indexes = Vec::new();
1167
1168        for (idx, (cid, _)) in responders.iter().enumerate() {
1169            let sample_id: SampleId = cid.try_into().unwrap();
1170            if sample_id.block_height() == height {
1171                indexes.push(idx)
1172            }
1173        }
1174
1175        for idx in indexes.into_iter().rev() {
1176            let (_cid, respond_to) = responders.remove(idx);
1177            respond_to.send(Err(P2pError::RequestTimedOut)).unwrap();
1178        }
1179    }
1180
1181    async fn gen_and_sample_block(
1182        handle: &mut MockP2pHandle,
1183        generator: &mut ExtendedHeaderGenerator,
1184        store: &InMemoryStore,
1185        event_sub: &mut EventSubscriber,
1186        square_width: usize,
1187        simulate_sampling_timeout: bool,
1188    ) {
1189        let eds = generate_dummy_eds(square_width, AppVersion::V2);
1190        let dah = DataAvailabilityHeader::from_eds(&eds);
1191        let header = generator.next_with_dah(dah);
1192        let height = header.height();
1193
1194        store.insert(header).await.unwrap();
1195
1196        sample_block(
1197            handle,
1198            store,
1199            event_sub,
1200            &eds,
1201            height,
1202            simulate_sampling_timeout,
1203        )
1204        .await;
1205
1206        // `gen_and_sample_block` is only used in cases where Daser doesn't have
1207        // anything else to schedule.
1208        handle.expect_no_cmd().await;
1209        assert!(event_sub.try_recv().is_err());
1210    }
1211
1212    async fn sample_block(
1213        handle: &mut MockP2pHandle,
1214        store: &InMemoryStore,
1215        event_sub: &mut EventSubscriber,
1216        eds: &ExtendedDataSquare,
1217        height: u64,
1218        simulate_sampling_timeout: bool,
1219    ) {
1220        let cids = handle_get_shwap_cid(handle, height, eds, simulate_sampling_timeout).await;
1221
1222        // Wait to be sampled
1223        sleep(Duration::from_millis(100)).await;
1224
1225        // Check if block was sampled or timed-out.
1226        let sampled_ranges = store.get_sampled_ranges().await.unwrap();
1227        assert_eq!(sampled_ranges.contains(height), !simulate_sampling_timeout);
1228
1229        // Check if CIDs we requested successfully made it in the store
1230        let mut sampling_metadata = store.get_sampling_metadata(height).await.unwrap().unwrap();
1231        sampling_metadata.cids.sort();
1232        assert_eq!(&sampling_metadata.cids, &cids);
1233
1234        // Check if we received `SamplingStarted` event
1235        let mut remaining_shares = match event_sub.try_recv().unwrap().event {
1236            NodeEvent::SamplingStarted {
1237                height: ev_height,
1238                square_width,
1239                shares,
1240            } => {
1241                assert_eq!(ev_height, height);
1242                assert_eq!(square_width, eds.square_width());
1243
1244                // Make sure the share list matches the CIDs we received
1245                let mut cids = shares
1246                    .iter()
1247                    .map(|(row, col)| sample_cid(*row, *col, height).unwrap())
1248                    .collect::<Vec<_>>();
1249                cids.sort();
1250                assert_eq!(&sampling_metadata.cids, &cids);
1251
1252                shares.into_iter().collect::<HashSet<_>>()
1253            }
1254            ev => panic!("Unexpected event: {ev}"),
1255        };
1256
1257        // Check if we received `ShareSamplingResult` for each share
1258        for i in 1..=remaining_shares.len() {
1259            match event_sub.try_recv().unwrap().event {
1260                NodeEvent::ShareSamplingResult {
1261                    height: ev_height,
1262                    square_width,
1263                    row,
1264                    column,
1265                    timed_out,
1266                } => {
1267                    assert_eq!(ev_height, height);
1268                    assert_eq!(square_width, eds.square_width());
1269                    assert_eq!(
1270                        timed_out,
1271                        simulate_sampling_timeout && i == REQ_TIMEOUT_SHARE_NUM
1272                    );
1273                    // Make sure it is in the list and remove it
1274                    assert!(remaining_shares.remove(&(row, column)));
1275                }
1276                ev => panic!("Unexpected event: {ev}"),
1277            }
1278        }
1279
1280        assert!(remaining_shares.is_empty());
1281
1282        // Check if we received `SamplingResult` for the block
1283        match event_sub.try_recv().unwrap().event {
1284            NodeEvent::SamplingResult {
1285                height: ev_height,
1286                timed_out,
1287                took,
1288            } => {
1289                assert_eq!(ev_height, height);
1290                assert_eq!(timed_out, simulate_sampling_timeout);
1291                assert_ne!(took, Duration::default());
1292            }
1293            ev => panic!("Unexpected event: {ev}"),
1294        }
1295    }
1296
1297    /// Responds to get_shwap_cid and returns all CIDs that were requested
1298    async fn handle_concurrent_get_shwap_cid<const N: usize>(
1299        handle: &mut MockP2pHandle,
1300        handling_args: [(u64, &ExtendedDataSquare, bool); N],
1301    ) -> Vec<Cid> {
1302        struct Info<'a> {
1303            eds: &'a ExtendedDataSquare,
1304            simulate_sampling_timeout: bool,
1305            needed_samples: usize,
1306            requests_count: usize,
1307        }
1308
1309        let mut infos = handling_args
1310            .into_iter()
1311            .map(|(height, eds, simulate_sampling_timeout)| {
1312                let square_width = eds.square_width() as usize;
1313                let needed_samples = (square_width * square_width).min(MAX_SAMPLES_NEEDED);
1314
1315                (
1316                    height,
1317                    Info {
1318                        eds,
1319                        simulate_sampling_timeout,
1320                        needed_samples,
1321                        requests_count: 0,
1322                    },
1323                )
1324            })
1325            .collect::<HashMap<_, _>>();
1326
1327        let needed_samples_sum = infos.values().map(|info| info.needed_samples).sum();
1328        let mut cids = Vec::with_capacity(needed_samples_sum);
1329
1330        for _ in 0..needed_samples_sum {
1331            let (cid, respond_to) = handle.expect_get_shwap_cid().await;
1332            cids.push(cid);
1333
1334            let sample_id: SampleId = cid.try_into().unwrap();
1335            let info = infos
1336                .get_mut(&sample_id.block_height())
1337                .unwrap_or_else(|| panic!("Unexpected height: {}", sample_id.block_height()));
1338
1339            info.requests_count += 1;
1340
1341            // Simulate sampling timeout
1342            if info.simulate_sampling_timeout && info.requests_count == REQ_TIMEOUT_SHARE_NUM {
1343                respond_to.send(Err(P2pError::RequestTimedOut)).unwrap();
1344                continue;
1345            }
1346
1347            let sample = gen_sample_of_cid(sample_id, info.eds).await;
1348            respond_to.send(Ok(sample)).unwrap();
1349        }
1350
1351        cids.sort();
1352        cids
1353    }
1354
1355    /// Responds to get_shwap_cid and returns all CIDs that were requested
1356    async fn handle_get_shwap_cid(
1357        handle: &mut MockP2pHandle,
1358        height: u64,
1359        eds: &ExtendedDataSquare,
1360        simulate_sampling_timeout: bool,
1361    ) -> Vec<Cid> {
1362        handle_concurrent_get_shwap_cid(handle, [(height, eds, simulate_sampling_timeout)]).await
1363    }
1364
1365    async fn gen_sample_of_cid(sample_id: SampleId, eds: &ExtendedDataSquare) -> Vec<u8> {
1366        let sample = Sample::new(
1367            sample_id.row_index(),
1368            sample_id.column_index(),
1369            AxisType::Row,
1370            eds,
1371        )
1372        .unwrap();
1373
1374        let mut container = BytesMut::new();
1375        sample.encode(&mut container);
1376
1377        let block = Block {
1378            cid: convert_cid(&sample_id.into()).unwrap().to_bytes(),
1379            container: container.to_vec(),
1380        };
1381
1382        block.encode_to_vec()
1383    }
1384}