Skip to main content

lumina_node/
daser.rs

1//! Component responsible for data availability sampling of the already synchronized block
2//! headers announced on the Celestia network.
3
4use std::collections::HashSet;
5use std::sync::Arc;
6use std::time::Duration;
7
8use futures::future::BoxFuture;
9use futures::stream::FuturesUnordered;
10use futures::{FutureExt, StreamExt};
11use lumina_utils::executor::{JoinHandle, spawn};
12use lumina_utils::time::{Instant, Interval};
13use rand::Rng;
14use tendermint::Time;
15use tokio::select;
16use tokio::sync::{mpsc, oneshot};
17use tokio_util::sync::CancellationToken;
18use tracing::{debug, error, info, instrument, warn};
19
20use crate::events::{EventPublisher, NodeEvent};
21use crate::p2p::shwap::sample_cid;
22use crate::p2p::{P2p, P2pError};
23use crate::store::{BlockRanges, Store, StoreError};
24use crate::utils::{OneshotSenderExt, TimeExt};
25
26const MAX_SAMPLES_NEEDED: usize = 16;
27const GET_SAMPLE_MIN_TIMEOUT: Duration = Duration::from_secs(10);
28const PRUNER_THRESHOLD: u64 = 512;
29
30pub(crate) const DEFAULT_CONCURENCY_LIMIT: usize = 3;
31pub(crate) const DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY: usize = 5;
32
33type Result<T, E = DaserError> = std::result::Result<T, E>;
34
35/// Representation of all the errors that can occur in `Daser` component.
36#[derive(Debug, thiserror::Error)]
37pub enum DaserError {
38    /// An error propagated from the `P2p` component.
39    #[error("P2p: {0}")]
40    P2p(#[from] P2pError),
41
42    /// An error propagated from the [`Store`] component.
43    #[error("Store: {0}")]
44    Store(#[from] StoreError),
45
46    /// The worker has died.
47    #[error("Worker died")]
48    WorkerDied,
49
50    /// Channel closed unexpectedly.
51    #[error("Channel closed unexpectedly")]
52    ChannelClosedUnexpectedly,
53}
54
55impl From<oneshot::error::RecvError> for DaserError {
56    fn from(_value: oneshot::error::RecvError) -> Self {
57        DaserError::ChannelClosedUnexpectedly
58    }
59}
60
61/// Component responsible for data availability sampling of blocks from the network.
62pub(crate) struct Daser {
63    cmd_tx: mpsc::Sender<DaserCmd>,
64    cancellation_token: CancellationToken,
65    join_handle: JoinHandle,
66}
67
68/// Arguments used to configure the [`Daser`].
69pub(crate) struct DaserArgs<S>
70where
71    S: Store,
72{
73    /// Handler for the peer to peer messaging.
74    pub(crate) p2p: Arc<P2p>,
75    /// Headers storage.
76    pub(crate) store: Arc<S>,
77    /// Event publisher.
78    pub(crate) event_pub: EventPublisher,
79    /// Size of the sampling window.
80    pub(crate) sampling_window: Duration,
81    /// How many blocks can be data sampled at the same time.
82    pub(crate) concurrency_limit: usize,
83    /// How many additional blocks can be data sampled if they are from HeaderSub.
84    pub(crate) additional_headersub_concurrency: usize,
85}
86
87#[derive(Debug)]
88pub(crate) enum DaserCmd {
89    UpdateHighestPrunableHeight {
90        value: u64,
91    },
92
93    UpdateNumberOfPrunableBlocks {
94        value: u64,
95    },
96
97    /// Used by Pruner to tell Daser about a block that is going to be pruned.
98    /// Daser then replies with `true` if Pruner is allowed to do it.
99    ///
100    /// This is needed to avoid following race condition:
101    ///
102    /// We have `Store` very tightly integrated with `beetswap::Multihasher`
103    /// and when Daser starts data sampling the header of that block must be
104    /// in the `Store` until the data sampling is finished. This can be fixed
105    /// only if we decouple `Store` from `beetswap::Multihasher`.
106    ///
107    /// However, even if we fix the above, a second problem arise: When Pruner
108    /// removes the header and samples of an ongoing data sampling, how are we
109    /// going to handle the incoming CIDs? We need somehow make sure that Pruner
110    /// will remove them after sampling is finished.
111    ///
112    /// After the above issues are fixed, this can be removed.
113    WantToPrune {
114        height: u64,
115        respond_to: oneshot::Sender<bool>,
116    },
117}
118
119impl Daser {
120    /// Create and start the [`Daser`].
121    pub(crate) fn start<S>(args: DaserArgs<S>) -> Result<Self>
122    where
123        S: Store + 'static,
124    {
125        let cancellation_token = CancellationToken::new();
126        let event_pub = args.event_pub.clone();
127        let (cmd_tx, cmd_rx) = mpsc::channel(16);
128        let mut worker = Worker::new(args, cancellation_token.child_token(), cmd_rx)?;
129
130        let join_handle = spawn(async move {
131            if let Err(e) = worker.run().await {
132                error!("Daser stopped because of a fatal error: {e}");
133
134                event_pub.send(NodeEvent::FatalDaserError {
135                    error: e.to_string(),
136                });
137            }
138        });
139
140        Ok(Daser {
141            cmd_tx,
142            cancellation_token,
143            join_handle,
144        })
145    }
146
147    #[cfg(test)]
148    pub(crate) fn mocked() -> (Self, crate::test_utils::MockDaserHandle) {
149        let (cmd_tx, cmd_rx) = mpsc::channel(16);
150        let cancellation_token = CancellationToken::new();
151
152        // Just a fake join_handle
153        let join_handle = spawn(async {});
154
155        let daser = Daser {
156            cmd_tx,
157            cancellation_token,
158            join_handle,
159        };
160
161        let mock_handle = crate::test_utils::MockDaserHandle { cmd_rx };
162
163        (daser, mock_handle)
164    }
165
166    /// Stop the worker.
167    pub(crate) fn stop(&self) {
168        // Signal the Worker to stop.
169        self.cancellation_token.cancel();
170    }
171
172    /// Wait until worker is completely stopped.
173    pub(crate) async fn join(&self) {
174        self.join_handle.join().await;
175    }
176
177    async fn send_command(&self, cmd: DaserCmd) -> Result<()> {
178        self.cmd_tx
179            .send(cmd)
180            .await
181            .map_err(|_| DaserError::WorkerDied)
182    }
183
184    pub(crate) async fn want_to_prune(&self, height: u64) -> Result<bool> {
185        let (tx, rx) = oneshot::channel();
186
187        self.send_command(DaserCmd::WantToPrune {
188            height,
189            respond_to: tx,
190        })
191        .await?;
192
193        Ok(rx.await?)
194    }
195
196    pub(crate) async fn update_highest_prunable_block(&self, value: u64) -> Result<()> {
197        self.send_command(DaserCmd::UpdateHighestPrunableHeight { value })
198            .await
199    }
200
201    pub(crate) async fn update_number_of_prunable_blocks(&self, value: u64) -> Result<()> {
202        self.send_command(DaserCmd::UpdateNumberOfPrunableBlocks { value })
203            .await
204    }
205}
206
207impl Drop for Daser {
208    fn drop(&mut self) {
209        self.stop();
210    }
211}
212
213struct Worker<S>
214where
215    S: Store + 'static,
216{
217    cmd_rx: mpsc::Receiver<DaserCmd>,
218    cancellation_token: CancellationToken,
219    event_pub: EventPublisher,
220    p2p: Arc<P2p>,
221    store: Arc<S>,
222    max_samples_needed: usize,
223    sampling_futs: FuturesUnordered<BoxFuture<'static, Result<(u64, bool)>>>,
224    queue: BlockRanges,
225    timed_out: BlockRanges,
226    ongoing: BlockRanges,
227    will_be_pruned: BlockRanges,
228    sampling_window: Duration,
229    concurrency_limit: usize,
230    additional_headersub_concurency: usize,
231    head_height: Option<u64>,
232    highest_prunable_height: Option<u64>,
233    num_of_prunable_blocks: u64,
234}
235
236impl<S> Worker<S>
237where
238    S: Store,
239{
240    fn new(
241        args: DaserArgs<S>,
242        cancellation_token: CancellationToken,
243        cmd_rx: mpsc::Receiver<DaserCmd>,
244    ) -> Result<Worker<S>> {
245        Ok(Worker {
246            cmd_rx,
247            cancellation_token,
248            event_pub: args.event_pub,
249            p2p: args.p2p,
250            store: args.store,
251            max_samples_needed: MAX_SAMPLES_NEEDED,
252            sampling_futs: FuturesUnordered::new(),
253            queue: BlockRanges::default(),
254            timed_out: BlockRanges::default(),
255            ongoing: BlockRanges::default(),
256            will_be_pruned: BlockRanges::default(),
257            sampling_window: args.sampling_window,
258            concurrency_limit: args.concurrency_limit,
259            additional_headersub_concurency: args.additional_headersub_concurrency,
260            head_height: None,
261            highest_prunable_height: None,
262            num_of_prunable_blocks: 0,
263        })
264    }
265
266    async fn run(&mut self) -> Result<()> {
267        loop {
268            if self.cancellation_token.is_cancelled() {
269                break;
270            }
271
272            self.connecting_event_loop().await;
273
274            if self.cancellation_token.is_cancelled() {
275                break;
276            }
277
278            self.connected_event_loop().await?;
279        }
280
281        debug!("Daser stopped");
282        Ok(())
283    }
284
285    async fn connecting_event_loop(&mut self) {
286        debug!("Entering connecting_event_loop");
287
288        let mut peer_tracker_info_watcher = self.p2p.peer_tracker_info_watcher();
289
290        // Check if connection status changed before watcher was created
291        if peer_tracker_info_watcher.borrow().num_connected_peers > 0 {
292            return;
293        }
294
295        loop {
296            select! {
297                _ = self.cancellation_token.cancelled() => {
298                    break;
299                }
300                _ = peer_tracker_info_watcher.changed() => {
301                    if peer_tracker_info_watcher.borrow().num_connected_peers > 0 {
302                        break;
303                    }
304                }
305                Some(cmd) = self.cmd_rx.recv() => self.on_cmd(cmd).await,
306            }
307        }
308    }
309
310    async fn connected_event_loop(&mut self) -> Result<()> {
311        debug!("Entering connected_event_loop");
312
313        let mut report_interval = Interval::new(Duration::from_secs(60));
314        let mut peer_tracker_info_watcher = self.p2p.peer_tracker_info_watcher();
315
316        // Check if connection status changed before the watcher was created
317        if peer_tracker_info_watcher.borrow().num_connected_peers == 0 {
318            warn!("All peers disconnected");
319            return Ok(());
320        }
321
322        // Workaround because `wait_new_head` is not cancel-safe.
323        //
324        // TODO: Only Syncer add new headers to the store, so ideally
325        // Syncer should inform Daser that new headers were added.
326        let store = self.store.clone();
327        let mut wait_new_head = store.wait_new_head();
328
329        self.update_queue().await?;
330
331        let mut first_report = true;
332
333        loop {
334            // Start as many data sampling we are allowed.
335            while self.schedule_next_sample_block().await? {}
336
337            if first_report {
338                self.report().await?;
339                first_report = false;
340            }
341
342            select! {
343                _ = self.cancellation_token.cancelled() => {
344                    break;
345                }
346                _ = peer_tracker_info_watcher.changed() => {
347                    if peer_tracker_info_watcher.borrow().num_connected_peers == 0 {
348                        warn!("All peers disconnected");
349                        break;
350                    }
351                }
352                _ = report_interval.tick() => self.report().await?,
353                Some(cmd) = self.cmd_rx.recv() => self.on_cmd(cmd).await,
354                Some(res) = self.sampling_futs.next() => {
355                    // Beetswap only returns fatal errors that are not related
356                    // to P2P nor networking.
357                    let (height, timed_out) = res?;
358
359                    if timed_out {
360                        self.timed_out.insert_relaxed(height..=height).expect("invalid height");
361                    } else {
362                        self.store.mark_as_sampled(height).await?;
363                    }
364
365                    self.ongoing.remove_relaxed(height..=height).expect("invalid height");
366                },
367                _ = &mut wait_new_head => {
368                    wait_new_head = store.wait_new_head();
369                    self.update_queue().await?;
370                }
371            }
372        }
373
374        self.sampling_futs.clear();
375        self.queue = BlockRanges::default();
376        self.ongoing = BlockRanges::default();
377        self.timed_out = BlockRanges::default();
378        self.head_height = None;
379
380        Ok(())
381    }
382
383    #[instrument(skip_all)]
384    async fn report(&mut self) -> Result<()> {
385        let sampled = self.store.get_sampled_ranges().await?;
386
387        info!(
388            "data sampling: stored and sampled blocks: {}, ongoing blocks: {}",
389            sampled, &self.ongoing,
390        );
391
392        Ok(())
393    }
394
395    async fn on_cmd(&mut self, cmd: DaserCmd) {
396        match cmd {
397            DaserCmd::WantToPrune { height, respond_to } => {
398                let res = self.on_want_to_prune(height).await;
399                respond_to.maybe_send(res);
400            }
401            DaserCmd::UpdateHighestPrunableHeight { value } => {
402                self.highest_prunable_height = Some(value);
403            }
404            DaserCmd::UpdateNumberOfPrunableBlocks { value } => {
405                self.num_of_prunable_blocks = value;
406            }
407        }
408    }
409
410    async fn on_want_to_prune(&mut self, height: u64) -> bool {
411        // Pruner should not remove headers that are related to an ongoing sampling.
412        if self.ongoing.contains(height) {
413            return false;
414        }
415
416        // Header will be pruned, so we remove it from the queue to avoid race conditions.
417        self.queue
418            .remove_relaxed(height..=height)
419            .expect("invalid height");
420        // We also make sure `populate_queue` will not put it back.
421        self.will_be_pruned
422            .insert_relaxed(height..=height)
423            .expect("invalid height");
424
425        true
426    }
427
428    async fn schedule_next_sample_block(&mut self) -> Result<bool> {
429        // Schedule the most recent unsampled block.
430        let header = loop {
431            let Some(height) = self.queue.pop_head() else {
432                return Ok(false);
433            };
434
435            let concurrency_limit = if height <= self.highest_prunable_height.unwrap_or(0)
436                && self.num_of_prunable_blocks >= PRUNER_THRESHOLD
437            {
438                // If a block is in the prunable area and Pruner has a lot of blocks
439                // to prune, then we pause sampling.
440                0
441            } else if height == self.head_height.unwrap_or(0) {
442                // For head we allow additional concurrency
443                self.concurrency_limit + self.additional_headersub_concurency
444            } else {
445                self.concurrency_limit
446            };
447
448            if self.sampling_futs.len() >= concurrency_limit {
449                // If concurrency limit is reached, then we pause sampling
450                // and we put back the height we previously popped.
451                self.queue
452                    .insert_relaxed(height..=height)
453                    .expect("invalid height");
454                return Ok(false);
455            }
456
457            match self.store.get_by_height(height).await {
458                Ok(header) => break header,
459                Err(StoreError::NotFound) => {
460                    // Height was pruned and our queue is inconsistent.
461                    // Repopulate queue and try again.
462                    self.update_queue().await?;
463                }
464                Err(e) => return Err(e.into()),
465            }
466        };
467
468        let height = header.height();
469        let square_width = header.square_width();
470
471        // Make sure that the block is still in the sampling window.
472        if !self.in_sampling_window(header.time()) {
473            // As soon as we reach a block that is not in the sampling
474            // window, it means the rest wouldn't be either.
475            self.queue
476                .remove_relaxed(1..=height)
477                .expect("invalid height");
478            self.timed_out
479                .insert_relaxed(1..=height)
480                .expect("invalid height");
481            return Ok(false);
482        }
483
484        // Select random shares to be sampled
485        let share_indexes = random_indexes(square_width, self.max_samples_needed);
486
487        // Update the CID list before we start sampling, otherwise it's possible for us
488        // to leak CIDs causing associated blocks to never get cleaned from blockstore.
489        let cids = share_indexes
490            .iter()
491            .map(|(row, col)| sample_cid(*row, *col, height))
492            .collect::<Result<Vec<_>, _>>()?;
493        self.store.update_sampling_metadata(height, cids).await?;
494
495        let p2p = self.p2p.clone();
496        let event_pub = self.event_pub.clone();
497        let sampling_window = self.sampling_window;
498
499        // Schedule retrieval of the CIDs. This will be run later on in the `select!` loop.
500        let fut = async move {
501            let now = Instant::now();
502
503            event_pub.send(NodeEvent::SamplingStarted {
504                height,
505                square_width,
506                shares: share_indexes.iter().copied().collect(),
507            });
508
509            // We set the timeout high enough until block goes out of sampling window.
510            let timeout = calc_timeout(header.time(), Time::now(), sampling_window);
511
512            // Initialize all futures
513            let mut futs = share_indexes
514                .into_iter()
515                .map(|(row, col)| {
516                    let p2p = p2p.clone();
517
518                    async move {
519                        let res = p2p.get_sample(row, col, height, Some(timeout)).await;
520                        (row, col, res)
521                    }
522                })
523                .collect::<FuturesUnordered<_>>();
524
525            let mut sampling_timed_out = false;
526
527            // Run futures to completion
528            while let Some((row, column, res)) = futs.next().await {
529                let timed_out = match res {
530                    Ok(_) => false,
531                    // Validation is done at Bitswap level, through `ShwapMultihasher`.
532                    // If the sample is not valid, it will never be delivered to us
533                    // as the data of the CID. Because of that, the only signal
534                    // that data sampling verification failed is query timing out.
535                    Err(P2pError::RequestTimedOut) => true,
536                    Err(e) => return Err(e.into()),
537                };
538
539                if timed_out {
540                    sampling_timed_out = true;
541                }
542
543                event_pub.send(NodeEvent::ShareSamplingResult {
544                    height,
545                    square_width,
546                    row,
547                    column,
548                    timed_out,
549                });
550            }
551
552            event_pub.send(NodeEvent::SamplingResult {
553                height,
554                timed_out: sampling_timed_out,
555                took: now.elapsed(),
556            });
557
558            Ok((height, sampling_timed_out))
559        }
560        .boxed();
561
562        self.sampling_futs.push(fut);
563        self.ongoing
564            .insert_relaxed(height..=height)
565            .expect("invalid height");
566
567        Ok(true)
568    }
569
570    /// Add to the queue the blocks that need to be sampled.
571    async fn update_queue(&mut self) -> Result<()> {
572        let stored = self.store.get_stored_header_ranges().await?;
573        let sampled = self.store.get_sampled_ranges().await?;
574
575        self.head_height = stored.head();
576        self.queue = stored - &sampled - &self.timed_out - &self.ongoing - &self.will_be_pruned;
577
578        Ok(())
579    }
580
581    /// Returns true if `time` is within the sampling window.
582    fn in_sampling_window(&self, time: Time) -> bool {
583        let now = Time::now();
584
585        // Header is from the future! Thus, within sampling window.
586        if now < time {
587            return true;
588        }
589
590        let Ok(age) = now.duration_since(time) else {
591            return false;
592        };
593
594        age <= self.sampling_window
595    }
596}
597
598fn calc_timeout(header_time: Time, now: Time, sampling_window: Duration) -> Duration {
599    let sampling_window_end = now.saturating_sub(sampling_window);
600
601    header_time
602        .duration_since(sampling_window_end)
603        .unwrap_or(GET_SAMPLE_MIN_TIMEOUT)
604        .max(GET_SAMPLE_MIN_TIMEOUT)
605}
606
607/// Returns unique and random indexes that will be used for sampling.
608fn random_indexes(square_width: u16, max_samples_needed: usize) -> HashSet<(u16, u16)> {
609    let samples_in_block = usize::from(square_width).pow(2);
610
611    // If block size is smaller than `max_samples_needed`, we are going
612    // to sample the whole block. Randomness is not needed for this.
613    if samples_in_block <= max_samples_needed {
614        return (0..square_width)
615            .flat_map(|row| (0..square_width).map(move |col| (row, col)))
616            .collect();
617    }
618
619    let mut indexes = HashSet::with_capacity(max_samples_needed);
620    let mut rng = rand::thread_rng();
621
622    while indexes.len() < max_samples_needed {
623        let row = rng.r#gen::<u16>() % square_width;
624        let col = rng.r#gen::<u16>() % square_width;
625        indexes.insert((row, col));
626    }
627
628    indexes
629}
630
631#[cfg(test)]
632mod tests {
633    use super::*;
634    use crate::events::{EventChannel, EventSubscriber};
635    use crate::node::SAMPLING_WINDOW;
636    use crate::p2p::P2pCmd;
637    use crate::p2p::shwap::convert_cid;
638    use crate::store::InMemoryStore;
639    use crate::test_utils::{ExtendedHeaderGeneratorExt, MockP2pHandle};
640    use crate::utils::OneshotResultSender;
641    use bytes::BytesMut;
642    use celestia_proto::bitswap::Block;
643    use celestia_types::sample::{Sample, SampleId};
644    use celestia_types::test_utils::{ExtendedHeaderGenerator, generate_dummy_eds};
645    use celestia_types::{AxisType, DataAvailabilityHeader, ExtendedDataSquare};
646    use cid::Cid;
647    use lumina_utils::test_utils::async_test;
648    use lumina_utils::time::sleep;
649    use prost::Message;
650    use std::collections::HashMap;
651    use std::time::Duration;
652
653    // Request number for which tests will simulate sampling timeout
654    //
655    // NOTE: The smallest block has 4 shares, so a 2nd request will always happen.
656    const REQ_TIMEOUT_SHARE_NUM: usize = 2;
657
658    #[async_test]
659    async fn check_calc_timeout() {
660        let now = Time::now();
661        let sampling_window = Duration::from_secs(60);
662
663        let header_time = now;
664        assert_eq!(
665            calc_timeout(header_time, now, sampling_window),
666            Duration::from_secs(60)
667        );
668
669        let header_time = now.checked_sub(Duration::from_secs(1)).unwrap();
670        assert_eq!(
671            calc_timeout(header_time, now, sampling_window),
672            Duration::from_secs(59)
673        );
674
675        let header_time = now.checked_sub(Duration::from_secs(49)).unwrap();
676        assert_eq!(
677            calc_timeout(header_time, now, sampling_window),
678            Duration::from_secs(11)
679        );
680
681        let header_time = now.checked_sub(Duration::from_secs(50)).unwrap();
682        assert_eq!(
683            calc_timeout(header_time, now, sampling_window),
684            Duration::from_secs(10)
685        );
686
687        // minimum timeout edge 1
688        let header_time = now.checked_sub(Duration::from_secs(51)).unwrap();
689        assert_eq!(
690            calc_timeout(header_time, now, sampling_window),
691            GET_SAMPLE_MIN_TIMEOUT
692        );
693
694        // minimum timeout edge 2
695        let header_time = now.checked_sub(Duration::from_secs(60)).unwrap();
696        assert_eq!(
697            calc_timeout(header_time, now, sampling_window),
698            GET_SAMPLE_MIN_TIMEOUT
699        );
700
701        // header outside of the sampling window
702        let header_time = now.checked_sub(Duration::from_secs(61)).unwrap();
703        assert_eq!(
704            calc_timeout(header_time, now, sampling_window),
705            GET_SAMPLE_MIN_TIMEOUT
706        );
707
708        // header from the "future"
709        let header_time = now.checked_add(Duration::from_secs(1)).unwrap();
710        assert_eq!(
711            calc_timeout(header_time, now, sampling_window),
712            Duration::from_secs(61)
713        );
714    }
715
716    #[async_test]
717    async fn received_valid_samples() {
718        let (mock, mut handle) = P2p::mocked();
719        let store = Arc::new(InMemoryStore::new());
720        let events = EventChannel::new();
721        let mut event_sub = events.subscribe();
722
723        let _daser = Daser::start(DaserArgs {
724            event_pub: events.publisher(),
725            p2p: Arc::new(mock),
726            store: store.clone(),
727            sampling_window: SAMPLING_WINDOW,
728            concurrency_limit: 1,
729            additional_headersub_concurrency: DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY,
730        })
731        .unwrap();
732
733        let mut generator = ExtendedHeaderGenerator::new();
734
735        handle.expect_no_cmd().await;
736        handle.announce_peer_connected();
737        handle.expect_no_cmd().await;
738
739        gen_and_sample_block(
740            &mut handle,
741            &mut generator,
742            &store,
743            &mut event_sub,
744            2,
745            false,
746        )
747        .await;
748        gen_and_sample_block(
749            &mut handle,
750            &mut generator,
751            &store,
752            &mut event_sub,
753            4,
754            false,
755        )
756        .await;
757        gen_and_sample_block(
758            &mut handle,
759            &mut generator,
760            &store,
761            &mut event_sub,
762            8,
763            false,
764        )
765        .await;
766        gen_and_sample_block(
767            &mut handle,
768            &mut generator,
769            &store,
770            &mut event_sub,
771            16,
772            false,
773        )
774        .await;
775    }
776
777    #[async_test]
778    async fn sampling_timeout() {
779        let (mock, mut handle) = P2p::mocked();
780        let store = Arc::new(InMemoryStore::new());
781        let events = EventChannel::new();
782        let mut event_sub = events.subscribe();
783
784        let _daser = Daser::start(DaserArgs {
785            event_pub: events.publisher(),
786            p2p: Arc::new(mock),
787            store: store.clone(),
788            sampling_window: SAMPLING_WINDOW,
789            concurrency_limit: 1,
790            additional_headersub_concurrency: DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY,
791        })
792        .unwrap();
793
794        let mut generator = ExtendedHeaderGenerator::new();
795
796        handle.expect_no_cmd().await;
797        handle.announce_peer_connected();
798        handle.expect_no_cmd().await;
799
800        gen_and_sample_block(
801            &mut handle,
802            &mut generator,
803            &store,
804            &mut event_sub,
805            2,
806            false,
807        )
808        .await;
809        gen_and_sample_block(&mut handle, &mut generator, &store, &mut event_sub, 4, true).await;
810        gen_and_sample_block(
811            &mut handle,
812            &mut generator,
813            &store,
814            &mut event_sub,
815            8,
816            false,
817        )
818        .await;
819    }
820
821    #[async_test]
822    async fn backward_dasing() {
823        let (mock, mut handle) = P2p::mocked();
824        let store = Arc::new(InMemoryStore::new());
825        let events = EventChannel::new();
826
827        let _daser = Daser::start(DaserArgs {
828            event_pub: events.publisher(),
829            p2p: Arc::new(mock),
830            store: store.clone(),
831            sampling_window: SAMPLING_WINDOW,
832            concurrency_limit: 1,
833            additional_headersub_concurrency: DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY,
834        })
835        .unwrap();
836
837        let mut generator = ExtendedHeaderGenerator::new();
838
839        handle.expect_no_cmd().await;
840        handle.announce_peer_connected();
841        handle.expect_no_cmd().await;
842
843        let mut edses = Vec::new();
844        let mut headers = Vec::new();
845
846        for _ in 0..20 {
847            let eds = generate_dummy_eds(2);
848            let dah = DataAvailabilityHeader::from_eds(&eds);
849            let header = generator.next_with_dah(dah);
850
851            edses.push(eds);
852            headers.push(header);
853        }
854
855        // Insert 5-10 block headers
856        store.insert(headers[4..=9].to_vec()).await.unwrap();
857
858        // Sample block 10
859        handle_get_shwap_cid(&mut handle, 10, &edses[9], false).await;
860
861        // Sample block 9
862        handle_get_shwap_cid(&mut handle, 9, &edses[8], false).await;
863
864        // To avoid race conditions we wait a bit for the block 8 to be scheduled
865        sleep(Duration::from_millis(10)).await;
866
867        // Insert 16-20 block headers
868        store.insert(headers[15..=19].to_vec()).await.unwrap();
869
870        // To avoid race conditions we wait a bit for the new head (block 20) to be scheduled
871        sleep(Duration::from_millis(10)).await;
872
873        // Now daser runs two concurrent data sampling: block 8 and block 20
874        handle_concurrent_get_shwap_cid(
875            &mut handle,
876            [(8, &edses[9], false), (20, &edses[19], false)],
877        )
878        .await;
879
880        // Sample and reject block 19
881        handle_get_shwap_cid(&mut handle, 19, &edses[18], true).await;
882
883        // Simulate disconnection
884        handle.announce_all_peers_disconnected();
885
886        // Daser may scheduled Block 18 already, so we need to reply to that requests.
887        // For the sake of the test we reply with a bitswap timeout.
888        while let Some(cmd) = handle.try_recv_cmd().await {
889            match cmd {
890                P2pCmd::GetShwapCid { respond_to, .. } => {
891                    let _ = respond_to.send(Err(P2pError::RequestTimedOut));
892                }
893                cmd => panic!("Unexpected command: {cmd:?}"),
894            }
895        }
896
897        // We shouldn't have any other requests from daser because it's in connecting state.
898        handle.expect_no_cmd().await;
899
900        // Simulate that a peer connected
901        handle.announce_peer_connected();
902
903        // Because of disconnection and previous rejection of block 19, daser will resample it
904        handle_get_shwap_cid(&mut handle, 19, &edses[18], false).await;
905
906        // Sample block 16 until 18
907        for height in (16..=18).rev() {
908            let idx = height as usize - 1;
909            handle_get_shwap_cid(&mut handle, height, &edses[idx], false).await;
910        }
911
912        // Sample block 5 until 7
913        for height in (5..=7).rev() {
914            let idx = height as usize - 1;
915            handle_get_shwap_cid(&mut handle, height, &edses[idx], false).await;
916        }
917
918        handle.expect_no_cmd().await;
919
920        // Push block 21 in the store
921        let eds = generate_dummy_eds(2);
922        let dah = DataAvailabilityHeader::from_eds(&eds);
923        let header = generator.next_with_dah(dah);
924        store.insert(header).await.unwrap();
925
926        // Sample block 21
927        handle_get_shwap_cid(&mut handle, 21, &eds, false).await;
928
929        handle.expect_no_cmd().await;
930    }
931
932    #[async_test]
933    async fn concurrency_limits() {
934        let (mock, mut handle) = P2p::mocked();
935        let store = Arc::new(InMemoryStore::new());
936        let events = EventChannel::new();
937
938        // Concurrency limit
939        let concurrency_limit = 10;
940        // Additional concurrency limit for heads.
941        // In other words concurrency limit becomes 15.
942        let additional_headersub_concurrency = 5;
943        // Default number of shares that ExtendedHeaderGenerator
944        // generates per block.
945        let shares_per_block = 4;
946
947        let mut generator = ExtendedHeaderGenerator::new();
948        store
949            .insert(generator.next_many_empty_verified(30))
950            .await
951            .unwrap();
952
953        let _daser = Daser::start(DaserArgs {
954            event_pub: events.publisher(),
955            p2p: Arc::new(mock),
956            store: store.clone(),
957            sampling_window: SAMPLING_WINDOW,
958            concurrency_limit,
959            additional_headersub_concurrency,
960        })
961        .unwrap();
962
963        handle.expect_no_cmd().await;
964        handle.announce_peer_connected();
965
966        let mut hold_respond_channels = Vec::new();
967
968        for _ in 0..(concurrency_limit * shares_per_block) {
969            let (cid, respond_to) = handle.expect_get_shwap_cid().await;
970            hold_respond_channels.push((cid, respond_to));
971        }
972
973        // Concurrency limit reached
974        handle.expect_no_cmd().await;
975
976        // However, a new head will be allowed because additional limit is applied
977        store
978            .insert(generator.next_many_empty_verified(2))
979            .await
980            .unwrap();
981
982        for _ in 0..shares_per_block {
983            let (cid, respond_to) = handle.expect_get_shwap_cid().await;
984            hold_respond_channels.push((cid, respond_to));
985        }
986        handle.expect_no_cmd().await;
987
988        // Now 11 blocks are ongoing. In order for Daser to schedule the next
989        // one, 2 blocks need to finish.
990        // We stop sampling for blocks 29 and 30 in order to simulate this.
991        stop_sampling_for(&mut hold_respond_channels, 29);
992        stop_sampling_for(&mut hold_respond_channels, 30);
993
994        // Now Daser will schedule the next block.
995        for _ in 0..shares_per_block {
996            let (cid, respond_to) = handle.expect_get_shwap_cid().await;
997            hold_respond_channels.push((cid, respond_to));
998        }
999
1000        // And... concurrency limit is reached again.
1001        handle.expect_no_cmd().await;
1002
1003        // Generate 5 more heads
1004        for _ in 0..additional_headersub_concurrency {
1005            store
1006                .insert(generator.next_many_empty_verified(1))
1007                .await
1008                .unwrap();
1009            // Give some time for Daser to schedule it
1010            sleep(Duration::from_millis(10)).await;
1011        }
1012
1013        for _ in 0..(additional_headersub_concurrency * shares_per_block) {
1014            let (cid, respond_to) = handle.expect_get_shwap_cid().await;
1015            hold_respond_channels.push((cid, respond_to));
1016        }
1017
1018        // Concurrency limit for heads is reached
1019        handle.expect_no_cmd().await;
1020        store
1021            .insert(generator.next_many_empty_verified(1))
1022            .await
1023            .unwrap();
1024        handle.expect_no_cmd().await;
1025
1026        // Now we stop 1 block and Daser will schedule the head
1027        // we generated above.
1028        stop_sampling_for(&mut hold_respond_channels, 28);
1029
1030        for _ in 0..shares_per_block {
1031            let (cid, respond_to) = handle.expect_get_shwap_cid().await;
1032            hold_respond_channels.push((cid, respond_to));
1033        }
1034
1035        // Concurrency limit for heads is reached again
1036        handle.expect_no_cmd().await;
1037        store
1038            .insert(generator.next_many_empty_verified(1))
1039            .await
1040            .unwrap();
1041        handle.expect_no_cmd().await;
1042    }
1043
1044    #[async_test]
1045    async fn ratelimit() {
1046        let (mock, mut handle) = P2p::mocked();
1047        let store = Arc::new(InMemoryStore::new());
1048        let events = EventChannel::new();
1049        let mut event_sub = events.subscribe();
1050
1051        let daser = Daser::start(DaserArgs {
1052            event_pub: events.publisher(),
1053            p2p: Arc::new(mock),
1054            store: store.clone(),
1055            sampling_window: Duration::from_secs(60),
1056            concurrency_limit: 1,
1057            additional_headersub_concurrency: DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY,
1058        })
1059        .unwrap();
1060
1061        let mut generator = ExtendedHeaderGenerator::new();
1062
1063        let now = Time::now();
1064        let first_header_time = (now - Duration::from_secs(1024)).unwrap();
1065        generator.set_time(first_header_time, Duration::from_secs(1));
1066        store.insert(generator.next_many(990)).await.unwrap();
1067
1068        let mut edses = HashMap::new();
1069
1070        for height in 991..=1000 {
1071            let eds = generate_dummy_eds(2);
1072            let dah = DataAvailabilityHeader::from_eds(&eds);
1073            let header = generator.next_with_dah(dah);
1074
1075            edses.insert(height, eds);
1076            store.insert(header).await.unwrap();
1077        }
1078
1079        // Assume that Pruner reports the following
1080        daser.update_highest_prunable_block(1000).await.unwrap();
1081        daser.update_number_of_prunable_blocks(1000).await.unwrap();
1082
1083        handle.expect_no_cmd().await;
1084        handle.announce_peer_connected();
1085
1086        // Blocks that are currently stored are rate-limited, so we shouldn't get any command.
1087        handle.expect_no_cmd().await;
1088
1089        // Any blocks above 1000 are not limited because they are not in prunable area
1090        gen_and_sample_block(
1091            &mut handle,
1092            &mut generator,
1093            &store,
1094            &mut event_sub,
1095            2,
1096            false,
1097        )
1098        .await;
1099        gen_and_sample_block(
1100            &mut handle,
1101            &mut generator,
1102            &store,
1103            &mut event_sub,
1104            2,
1105            false,
1106        )
1107        .await;
1108
1109        // Again back to rate-limited
1110        handle.expect_no_cmd().await;
1111
1112        // Now Pruner reports that number of prunable blocks is lower than the threshold
1113        daser
1114            .update_number_of_prunable_blocks(PRUNER_THRESHOLD - 1)
1115            .await
1116            .unwrap();
1117        sleep(Duration::from_millis(5)).await;
1118
1119        // Sample blocks that are in prunable area
1120        sample_block(
1121            &mut handle,
1122            &store,
1123            &mut event_sub,
1124            edses.get(&1000).unwrap(),
1125            1000,
1126            false,
1127        )
1128        .await;
1129        sample_block(
1130            &mut handle,
1131            &store,
1132            &mut event_sub,
1133            edses.get(&999).unwrap(),
1134            999,
1135            false,
1136        )
1137        .await;
1138
1139        // Now pruner reports prunable blocks reached the threshold again
1140        daser
1141            .update_number_of_prunable_blocks(PRUNER_THRESHOLD)
1142            .await
1143            .unwrap();
1144        sleep(Duration::from_millis(5)).await;
1145
1146        // But sampling of 998 block was already scheduled, so we need to handle it
1147        sample_block(
1148            &mut handle,
1149            &store,
1150            &mut event_sub,
1151            edses.get(&998).unwrap(),
1152            998,
1153            false,
1154        )
1155        .await;
1156
1157        // Daser is rate-limited again
1158        handle.expect_no_cmd().await;
1159    }
1160
1161    fn stop_sampling_for(
1162        responders: &mut Vec<(Cid, OneshotResultSender<Vec<u8>, P2pError>)>,
1163        height: u64,
1164    ) {
1165        let mut indexes = Vec::new();
1166
1167        for (idx, (cid, _)) in responders.iter().enumerate() {
1168            let sample_id: SampleId = cid.try_into().unwrap();
1169            if sample_id.block_height() == height {
1170                indexes.push(idx)
1171            }
1172        }
1173
1174        for idx in indexes.into_iter().rev() {
1175            let (_cid, respond_to) = responders.remove(idx);
1176            respond_to.send(Err(P2pError::RequestTimedOut)).unwrap();
1177        }
1178    }
1179
1180    async fn gen_and_sample_block(
1181        handle: &mut MockP2pHandle,
1182        generator: &mut ExtendedHeaderGenerator,
1183        store: &InMemoryStore,
1184        event_sub: &mut EventSubscriber,
1185        square_width: usize,
1186        simulate_sampling_timeout: bool,
1187    ) {
1188        let eds = generate_dummy_eds(square_width);
1189        let dah = DataAvailabilityHeader::from_eds(&eds);
1190        let header = generator.next_with_dah(dah);
1191        let height = header.height();
1192
1193        store.insert(header).await.unwrap();
1194
1195        sample_block(
1196            handle,
1197            store,
1198            event_sub,
1199            &eds,
1200            height,
1201            simulate_sampling_timeout,
1202        )
1203        .await;
1204
1205        // `gen_and_sample_block` is only used in cases where Daser doesn't have
1206        // anything else to schedule.
1207        handle.expect_no_cmd().await;
1208        assert!(event_sub.try_recv().is_err());
1209    }
1210
1211    async fn sample_block(
1212        handle: &mut MockP2pHandle,
1213        store: &InMemoryStore,
1214        event_sub: &mut EventSubscriber,
1215        eds: &ExtendedDataSquare,
1216        height: u64,
1217        simulate_sampling_timeout: bool,
1218    ) {
1219        let cids = handle_get_shwap_cid(handle, height, eds, simulate_sampling_timeout).await;
1220
1221        // Wait to be sampled
1222        sleep(Duration::from_millis(100)).await;
1223
1224        // Check if block was sampled or timed-out.
1225        let sampled_ranges = store.get_sampled_ranges().await.unwrap();
1226        assert_eq!(sampled_ranges.contains(height), !simulate_sampling_timeout);
1227
1228        // Check if CIDs we requested successfully made it in the store
1229        let mut sampling_metadata = store.get_sampling_metadata(height).await.unwrap().unwrap();
1230        sampling_metadata.cids.sort();
1231        assert_eq!(&sampling_metadata.cids, &cids);
1232
1233        // Check if we received `SamplingStarted` event
1234        let mut remaining_shares = match event_sub.try_recv().unwrap().event {
1235            NodeEvent::SamplingStarted {
1236                height: ev_height,
1237                square_width,
1238                shares,
1239            } => {
1240                assert_eq!(ev_height, height);
1241                assert_eq!(square_width, eds.square_width());
1242
1243                // Make sure the share list matches the CIDs we received
1244                let mut cids = shares
1245                    .iter()
1246                    .map(|(row, col)| sample_cid(*row, *col, height).unwrap())
1247                    .collect::<Vec<_>>();
1248                cids.sort();
1249                assert_eq!(&sampling_metadata.cids, &cids);
1250
1251                shares.into_iter().collect::<HashSet<_>>()
1252            }
1253            ev => panic!("Unexpected event: {ev}"),
1254        };
1255
1256        // Check if we received `ShareSamplingResult` for each share
1257        for i in 1..=remaining_shares.len() {
1258            match event_sub.try_recv().unwrap().event {
1259                NodeEvent::ShareSamplingResult {
1260                    height: ev_height,
1261                    square_width,
1262                    row,
1263                    column,
1264                    timed_out,
1265                } => {
1266                    assert_eq!(ev_height, height);
1267                    assert_eq!(square_width, eds.square_width());
1268                    assert_eq!(
1269                        timed_out,
1270                        simulate_sampling_timeout && i == REQ_TIMEOUT_SHARE_NUM
1271                    );
1272                    // Make sure it is in the list and remove it
1273                    assert!(remaining_shares.remove(&(row, column)));
1274                }
1275                ev => panic!("Unexpected event: {ev}"),
1276            }
1277        }
1278
1279        assert!(remaining_shares.is_empty());
1280
1281        // Check if we received `SamplingResult` for the block
1282        match event_sub.try_recv().unwrap().event {
1283            NodeEvent::SamplingResult {
1284                height: ev_height,
1285                timed_out,
1286                took,
1287            } => {
1288                assert_eq!(ev_height, height);
1289                assert_eq!(timed_out, simulate_sampling_timeout);
1290                assert_ne!(took, Duration::default());
1291            }
1292            ev => panic!("Unexpected event: {ev}"),
1293        }
1294    }
1295
1296    /// Responds to get_shwap_cid and returns all CIDs that were requested
1297    async fn handle_concurrent_get_shwap_cid<const N: usize>(
1298        handle: &mut MockP2pHandle,
1299        handling_args: [(u64, &ExtendedDataSquare, bool); N],
1300    ) -> Vec<Cid> {
1301        struct Info<'a> {
1302            eds: &'a ExtendedDataSquare,
1303            simulate_sampling_timeout: bool,
1304            needed_samples: usize,
1305            requests_count: usize,
1306        }
1307
1308        let mut infos = handling_args
1309            .into_iter()
1310            .map(|(height, eds, simulate_sampling_timeout)| {
1311                let square_width = eds.square_width() as usize;
1312                let needed_samples = (square_width * square_width).min(MAX_SAMPLES_NEEDED);
1313
1314                (
1315                    height,
1316                    Info {
1317                        eds,
1318                        simulate_sampling_timeout,
1319                        needed_samples,
1320                        requests_count: 0,
1321                    },
1322                )
1323            })
1324            .collect::<HashMap<_, _>>();
1325
1326        let needed_samples_sum = infos.values().map(|info| info.needed_samples).sum();
1327        let mut cids = Vec::with_capacity(needed_samples_sum);
1328
1329        for _ in 0..needed_samples_sum {
1330            let (cid, respond_to) = handle.expect_get_shwap_cid().await;
1331            cids.push(cid);
1332
1333            let sample_id: SampleId = cid.try_into().unwrap();
1334            let info = infos
1335                .get_mut(&sample_id.block_height())
1336                .unwrap_or_else(|| panic!("Unexpected height: {}", sample_id.block_height()));
1337
1338            info.requests_count += 1;
1339
1340            // Simulate sampling timeout
1341            if info.simulate_sampling_timeout && info.requests_count == REQ_TIMEOUT_SHARE_NUM {
1342                respond_to.send(Err(P2pError::RequestTimedOut)).unwrap();
1343                continue;
1344            }
1345
1346            let sample = gen_sample_of_cid(sample_id, info.eds).await;
1347            respond_to.send(Ok(sample)).unwrap();
1348        }
1349
1350        cids.sort();
1351        cids
1352    }
1353
1354    /// Responds to get_shwap_cid and returns all CIDs that were requested
1355    async fn handle_get_shwap_cid(
1356        handle: &mut MockP2pHandle,
1357        height: u64,
1358        eds: &ExtendedDataSquare,
1359        simulate_sampling_timeout: bool,
1360    ) -> Vec<Cid> {
1361        handle_concurrent_get_shwap_cid(handle, [(height, eds, simulate_sampling_timeout)]).await
1362    }
1363
1364    async fn gen_sample_of_cid(sample_id: SampleId, eds: &ExtendedDataSquare) -> Vec<u8> {
1365        let sample = Sample::new(
1366            sample_id.row_index(),
1367            sample_id.column_index(),
1368            AxisType::Row,
1369            eds,
1370        )
1371        .unwrap();
1372
1373        let mut container = BytesMut::new();
1374        sample.encode(&mut container);
1375
1376        let block = Block {
1377            cid: convert_cid(&sample_id.into()).unwrap().to_bytes(),
1378            container: container.to_vec(),
1379        };
1380
1381        block.encode_to_vec()
1382    }
1383}