lumina_node/
daser.rs

1//! Component responsible for data availability sampling of the already synchronized block
2//! headers announced on the Celestia network.
3//!
4//! Sampling procedure comprises the following steps:
5//!
6//! 1. Daser waits for at least one peer to connect.
7//! 2. Daser iterates in descending order over all the headers stored in the [`Store`].
8//!     - If a block has not been sampled or it was rejected, Daser will queue it for sampling.
9//!       Rejected blocks are resampled because their rejection could be caused by
10//!       edge-cases unrelated to data availability, such as network issues.
11//!     - If a block is not within the sampling window, it is not queued.
12//!     - Queue is always sorted in descending order to give priority to latest blocks.
13//! 3. As new headers become available in the [`Store`], Daser adds them to the queue if
14//!    they are within the sampling window.
15//! 4. If at any point new HEAD is queued, it is scheduled immediately and concurrently. Otherwise
16//!    Daser waits for any ongoing sampling to finish and schedules a next block from the queue.
17//!    Daser executes the following procedure for every scheduled block:
18//!     - It makes sure that the block is still within the sampling window.
19//!     - It selects which random shares are going to be sampled and generates their Shwap CIDs.
20//!     - It updates [`Store`] with the CIDs that are going to be sampled. Tracking of the the CIDs
21//!       is needed for pruning them later on. This is done before retrival of CIDs is started because
22//!       otherwise user could stop the node after Bitswap stores the block in the blockstore, but before
23//!       we can record that in the [`Store`], causing a leak.
24//!     - Initiates Bitswap retrival requests for the specified CIDs.
25//!     - If all CIDs are received, then the block is considered sampled and accepted.
26//!     - If we reach a timeout of 10 seconds and at least one of the CIDs is not received, then
27//!       block is considered sampled and rejected.
28//!     - [`Store`] is updated with the sampling result.
29//! 5. Steps 3 and 4 are repeated concurently, unless we detect that all peers have disconnected.
30//!    At that point Daser cleans the queue and moves back to step 1.
31
32use std::collections::HashSet;
33use std::sync::Arc;
34use std::time::Duration;
35
36use futures::future::BoxFuture;
37use futures::stream::FuturesUnordered;
38use futures::{FutureExt, StreamExt};
39use lumina_utils::executor::{spawn, JoinHandle};
40use lumina_utils::time::Instant;
41use rand::Rng;
42use tendermint::Time;
43use tokio::select;
44use tokio_util::sync::CancellationToken;
45use tracing::{debug, error, warn};
46
47use crate::events::{EventPublisher, NodeEvent};
48use crate::p2p::shwap::sample_cid;
49use crate::p2p::{P2p, P2pError};
50use crate::store::{BlockRanges, SamplingStatus, Store, StoreError};
51
52const MAX_SAMPLES_NEEDED: usize = 16;
53const GET_SAMPLE_TIMEOUT: Duration = Duration::from_secs(10);
54
55type Result<T, E = DaserError> = std::result::Result<T, E>;
56
57/// Representation of all the errors that can occur in `Daser` component.
58#[derive(Debug, thiserror::Error)]
59pub enum DaserError {
60    /// An error propagated from the `P2p` component.
61    #[error("P2p: {0}")]
62    P2p(#[from] P2pError),
63
64    /// An error propagated from the [`Store`] component.
65    #[error("Store: {0}")]
66    Store(#[from] StoreError),
67}
68
69/// Component responsible for data availability sampling of blocks from the network.
70pub(crate) struct Daser {
71    cancellation_token: CancellationToken,
72    join_handle: JoinHandle,
73}
74
75/// Arguments used to configure the [`Daser`].
76pub(crate) struct DaserArgs<S>
77where
78    S: Store,
79{
80    /// Handler for the peer to peer messaging.
81    pub(crate) p2p: Arc<P2p>,
82    /// Headers storage.
83    pub(crate) store: Arc<S>,
84    /// Event publisher.
85    pub(crate) event_pub: EventPublisher,
86    /// Size of the sampling window.
87    pub(crate) sampling_window: Duration,
88}
89
90impl Daser {
91    /// Create and start the [`Daser`].
92    pub(crate) fn start<S>(args: DaserArgs<S>) -> Result<Self>
93    where
94        S: Store + 'static,
95    {
96        let cancellation_token = CancellationToken::new();
97        let event_pub = args.event_pub.clone();
98        let mut worker = Worker::new(args, cancellation_token.child_token())?;
99
100        let join_handle = spawn(async move {
101            if let Err(e) = worker.run().await {
102                error!("Daser stopped because of a fatal error: {e}");
103
104                event_pub.send(NodeEvent::FatalDaserError {
105                    error: e.to_string(),
106                });
107            }
108        });
109
110        Ok(Daser {
111            cancellation_token,
112            join_handle,
113        })
114    }
115
116    /// Stop the worker.
117    pub(crate) fn stop(&self) {
118        // Singal the Worker to stop.
119        self.cancellation_token.cancel();
120    }
121
122    /// Wait until worker is completely stopped.
123    pub(crate) async fn join(&self) {
124        self.join_handle.join().await;
125    }
126}
127
128impl Drop for Daser {
129    fn drop(&mut self) {
130        self.stop();
131    }
132}
133
134struct Worker<S>
135where
136    S: Store + 'static,
137{
138    cancellation_token: CancellationToken,
139    event_pub: EventPublisher,
140    p2p: Arc<P2p>,
141    store: Arc<S>,
142    max_samples_needed: usize,
143    sampling_futs: FuturesUnordered<BoxFuture<'static, Result<(u64, bool)>>>,
144    queue: BlockRanges,
145    done: BlockRanges,
146    ongoing: BlockRanges,
147    prev_head: Option<u64>,
148    sampling_window: Duration,
149}
150
151impl<S> Worker<S>
152where
153    S: Store,
154{
155    fn new(args: DaserArgs<S>, cancellation_token: CancellationToken) -> Result<Worker<S>> {
156        Ok(Worker {
157            cancellation_token,
158            event_pub: args.event_pub,
159            p2p: args.p2p,
160            store: args.store,
161            max_samples_needed: MAX_SAMPLES_NEEDED,
162            sampling_futs: FuturesUnordered::new(),
163            queue: BlockRanges::default(),
164            done: BlockRanges::default(),
165            ongoing: BlockRanges::default(),
166            prev_head: None,
167            sampling_window: args.sampling_window,
168        })
169    }
170
171    async fn run(&mut self) -> Result<()> {
172        loop {
173            if self.cancellation_token.is_cancelled() {
174                break;
175            }
176
177            self.connecting_event_loop().await;
178
179            if self.cancellation_token.is_cancelled() {
180                break;
181            }
182
183            self.connected_event_loop().await?;
184        }
185
186        debug!("Daser stopped");
187        Ok(())
188    }
189
190    async fn connecting_event_loop(&mut self) {
191        debug!("Entering connecting_event_loop");
192
193        let mut peer_tracker_info_watcher = self.p2p.peer_tracker_info_watcher();
194
195        // Check if connection status changed before watcher was created
196        if peer_tracker_info_watcher.borrow().num_connected_peers > 0 {
197            return;
198        }
199
200        loop {
201            select! {
202                _ = self.cancellation_token.cancelled() => {
203                    break;
204                }
205                _ = peer_tracker_info_watcher.changed() => {
206                    if peer_tracker_info_watcher.borrow().num_connected_peers > 0 {
207                        break;
208                    }
209                }
210            }
211        }
212    }
213
214    async fn connected_event_loop(&mut self) -> Result<()> {
215        debug!("Entering connected_event_loop");
216
217        let mut peer_tracker_info_watcher = self.p2p.peer_tracker_info_watcher();
218
219        // Check if connection status changed before the watcher was created
220        if peer_tracker_info_watcher.borrow().num_connected_peers == 0 {
221            warn!("All peers disconnected");
222            return Ok(());
223        }
224
225        // Workaround because `wait_new_head` is not cancel-safe.
226        //
227        // TODO: Only Syncer add new headers to the store, so ideally
228        // Syncer should inform Daser that new headers were added.
229        let store = self.store.clone();
230        let mut wait_new_head = store.wait_new_head();
231
232        self.populate_queue().await?;
233
234        loop {
235            // If we have a new HEAD queued, schedule it now!
236            if let Some(queue_head) = self.queue.head() {
237                if queue_head > self.prev_head.unwrap_or(0) {
238                    self.schedule_next_sample_block().await?;
239                    self.prev_head = Some(queue_head);
240                }
241            }
242
243            // If there is no ongoing data sampling, schedule the next one.
244            if self.sampling_futs.is_empty() {
245                self.schedule_next_sample_block().await?;
246            }
247
248            select! {
249                _ = self.cancellation_token.cancelled() => {
250                    break;
251                }
252                _ = peer_tracker_info_watcher.changed() => {
253                    if peer_tracker_info_watcher.borrow().num_connected_peers == 0 {
254                        warn!("All peers disconnected");
255                        break;
256                    }
257                }
258                Some(res) = self.sampling_futs.next() => {
259                    // Beetswap only returns fatal errors that are not related
260                    // to P2P nor networking.
261                    let (height, accepted) = res?;
262
263                    let status = if accepted {
264                        SamplingStatus::Accepted
265                    } else {
266                        SamplingStatus::Rejected
267                    };
268
269                    self.store
270                        .update_sampling_metadata(height, status, Vec::new())
271                        .await?;
272
273                    self.ongoing.remove_relaxed(height..=height).expect("invalid height");
274                    self.done.insert_relaxed(height..=height).expect("invalid height");
275                },
276                _ = &mut wait_new_head => {
277                    wait_new_head = store.wait_new_head();
278                    self.populate_queue().await?;
279                }
280            }
281        }
282
283        self.sampling_futs.clear();
284        self.queue = BlockRanges::default();
285        self.ongoing = BlockRanges::default();
286        self.done = BlockRanges::default();
287        self.prev_head = None;
288
289        Ok(())
290    }
291
292    async fn schedule_next_sample_block(&mut self) -> Result<()> {
293        // Schedule the most recent un-sampled block.
294        let header = loop {
295            let Some(height) = self.queue.pop_head() else {
296                return Ok(());
297            };
298
299            match self.store.get_by_height(height).await {
300                Ok(header) => break header,
301                Err(StoreError::NotFound) => {
302                    // Height was pruned and our queue is inconsistent.
303                    // Repopulate queue and try again.
304                    self.populate_queue().await?;
305                }
306                Err(e) => return Err(e.into()),
307            }
308        };
309
310        let height = header.height().value();
311        let square_width = header.dah.square_width();
312
313        // Make sure that the block is still in the sampling window.
314        if !self.in_sampling_window(header.time()) {
315            // As soon as we reach a block that is not in the sampling
316            // window, it means the rest wouldn't be either.
317            self.queue
318                .remove_relaxed(1..=height)
319                .expect("invalid height");
320            self.done
321                .insert_relaxed(1..=height)
322                .expect("invalid height");
323            return Ok(());
324        }
325
326        // Select random shares to be sampled
327        let share_indexes = random_indexes(square_width, self.max_samples_needed);
328
329        // Update the CID list before we start sampling, otherwise it's possible for us
330        // to leak CIDs causing associated blocks to never get cleaned from blockstore.
331        let cids = share_indexes
332            .iter()
333            .map(|(row, col)| sample_cid(*row, *col, height))
334            .collect::<Result<Vec<_>, _>>()?;
335
336        // NOTE: Pruning window is always 1 hour bigger than sampling
337        // window, so after `in_sampling_window` if statement we shouldn't
338        // care about `StoreError::NotFound` anymore.
339        self.store
340            .update_sampling_metadata(height, SamplingStatus::Unknown, cids)
341            .await?;
342
343        let p2p = self.p2p.clone();
344        let event_pub = self.event_pub.clone();
345
346        // Schedule retrival of the CIDs. This will be run later on in the `select!` loop.
347        let fut = async move {
348            let now = Instant::now();
349
350            event_pub.send(NodeEvent::SamplingStarted {
351                height,
352                square_width,
353                shares: share_indexes.iter().copied().collect(),
354            });
355
356            // Initialize all futures
357            let mut futs = share_indexes
358                .into_iter()
359                .map(|(row, col)| {
360                    let p2p = p2p.clone();
361
362                    async move {
363                        let res = p2p
364                            .get_sample(row, col, height, Some(GET_SAMPLE_TIMEOUT))
365                            .await;
366                        (row, col, res)
367                    }
368                })
369                .collect::<FuturesUnordered<_>>();
370
371            let mut block_accepted = true;
372
373            // Run futures to completion
374            while let Some((row, column, res)) = futs.next().await {
375                let share_accepted = match res {
376                    Ok(_) => true,
377                    // Validation is done at Bitswap level, through `ShwapMultihasher`.
378                    // If the sample is not valid, it will never be delivered to us
379                    // as the data of the CID. Because of that, the only signal
380                    // that data sampling verification failed is query timing out.
381                    Err(P2pError::BitswapQueryTimeout) => false,
382                    Err(e) => return Err(e.into()),
383                };
384
385                block_accepted &= share_accepted;
386
387                event_pub.send(NodeEvent::ShareSamplingResult {
388                    height,
389                    square_width,
390                    row,
391                    column,
392                    accepted: share_accepted,
393                });
394            }
395
396            event_pub.send(NodeEvent::SamplingFinished {
397                height,
398                accepted: block_accepted,
399                took: now.elapsed(),
400            });
401
402            Ok((height, block_accepted))
403        }
404        .boxed();
405
406        self.sampling_futs.push(fut);
407        self.ongoing
408            .insert_relaxed(height..=height)
409            .expect("invalid height");
410
411        Ok(())
412    }
413
414    /// Add to the queue the blocks that need to be sampled.
415    ///
416    /// NOTE: We resample rejected blocks, because rejection can happen
417    /// in some unrelated edge-cases, such us network issues. This is a Shwap
418    /// limitation that's coming from bitswap: only way for us to know if sampling
419    /// failed is via timeout.
420    async fn populate_queue(&mut self) -> Result<()> {
421        let stored = self.store.get_stored_header_ranges().await?;
422        let accepted = self.store.get_accepted_sampling_ranges().await?;
423
424        self.queue = stored - accepted - &self.done - &self.ongoing;
425
426        Ok(())
427    }
428
429    /// Returns true if `time` is within the sampling window.
430    fn in_sampling_window(&self, time: Time) -> bool {
431        let now = Time::now();
432
433        // Header is from the future! Thus, within sampling window.
434        if now < time {
435            return true;
436        }
437
438        let Ok(age) = now.duration_since(time) else {
439            return false;
440        };
441
442        age <= self.sampling_window
443    }
444}
445
446/// Returns unique and random indexes that will be used for sampling.
447fn random_indexes(square_width: u16, max_samples_needed: usize) -> HashSet<(u16, u16)> {
448    let samples_in_block = usize::from(square_width).pow(2);
449
450    // If block size is smaller than `max_samples_needed`, we are going
451    // to sample the whole block. Randomness is not needed for this.
452    if samples_in_block <= max_samples_needed {
453        return (0..square_width)
454            .flat_map(|row| (0..square_width).map(move |col| (row, col)))
455            .collect();
456    }
457
458    let mut indexes = HashSet::with_capacity(max_samples_needed);
459    let mut rng = rand::thread_rng();
460
461    while indexes.len() < max_samples_needed {
462        let row = rng.gen::<u16>() % square_width;
463        let col = rng.gen::<u16>() % square_width;
464        indexes.insert((row, col));
465    }
466
467    indexes
468}
469
470#[cfg(test)]
471mod tests {
472    use super::*;
473    use crate::events::{EventChannel, EventSubscriber};
474    use crate::node::DEFAULT_SAMPLING_WINDOW;
475    use crate::p2p::shwap::convert_cid;
476    use crate::p2p::P2pCmd;
477    use crate::store::InMemoryStore;
478    use crate::test_utils::MockP2pHandle;
479    use bytes::BytesMut;
480    use celestia_proto::bitswap::Block;
481    use celestia_types::consts::appconsts::AppVersion;
482    use celestia_types::sample::{Sample, SampleId};
483    use celestia_types::test_utils::{generate_dummy_eds, ExtendedHeaderGenerator};
484    use celestia_types::{AxisType, DataAvailabilityHeader, ExtendedDataSquare};
485    use cid::Cid;
486    use lumina_utils::test_utils::async_test;
487    use lumina_utils::time::sleep;
488    use prost::Message;
489    use std::collections::HashMap;
490    use std::time::Duration;
491
492    // Request number for which tests will simulate invalid sampling
493    //
494    // NOTE: The smallest block has 4 shares, so a 2nd request will always happen.
495    const INVALID_SHARE_REQ_NUM: usize = 2;
496
497    #[async_test]
498    async fn received_valid_samples() {
499        let (mock, mut handle) = P2p::mocked();
500        let store = Arc::new(InMemoryStore::new());
501        let events = EventChannel::new();
502        let mut event_sub = events.subscribe();
503
504        let _daser = Daser::start(DaserArgs {
505            event_pub: events.publisher(),
506            p2p: Arc::new(mock),
507            store: store.clone(),
508            sampling_window: DEFAULT_SAMPLING_WINDOW,
509        })
510        .unwrap();
511
512        let mut gen = ExtendedHeaderGenerator::new();
513
514        handle.expect_no_cmd().await;
515        handle.announce_peer_connected();
516        handle.expect_no_cmd().await;
517
518        gen_and_sample_block(&mut handle, &mut gen, &store, &mut event_sub, 2, false).await;
519        gen_and_sample_block(&mut handle, &mut gen, &store, &mut event_sub, 4, false).await;
520        gen_and_sample_block(&mut handle, &mut gen, &store, &mut event_sub, 8, false).await;
521        gen_and_sample_block(&mut handle, &mut gen, &store, &mut event_sub, 16, false).await;
522    }
523
524    #[async_test]
525    async fn received_invalid_sample() {
526        let (mock, mut handle) = P2p::mocked();
527        let store = Arc::new(InMemoryStore::new());
528        let events = EventChannel::new();
529        let mut event_sub = events.subscribe();
530
531        let _daser = Daser::start(DaserArgs {
532            event_pub: events.publisher(),
533            p2p: Arc::new(mock),
534            store: store.clone(),
535            sampling_window: DEFAULT_SAMPLING_WINDOW,
536        })
537        .unwrap();
538
539        let mut gen = ExtendedHeaderGenerator::new();
540
541        handle.expect_no_cmd().await;
542        handle.announce_peer_connected();
543        handle.expect_no_cmd().await;
544
545        gen_and_sample_block(&mut handle, &mut gen, &store, &mut event_sub, 2, false).await;
546        gen_and_sample_block(&mut handle, &mut gen, &store, &mut event_sub, 4, true).await;
547        gen_and_sample_block(&mut handle, &mut gen, &store, &mut event_sub, 8, false).await;
548    }
549
550    #[async_test]
551    async fn backward_dasing() {
552        let (mock, mut handle) = P2p::mocked();
553        let store = Arc::new(InMemoryStore::new());
554        let events = EventChannel::new();
555
556        let _daser = Daser::start(DaserArgs {
557            event_pub: events.publisher(),
558            p2p: Arc::new(mock),
559            store: store.clone(),
560            sampling_window: DEFAULT_SAMPLING_WINDOW,
561        })
562        .unwrap();
563
564        let mut gen = ExtendedHeaderGenerator::new();
565
566        handle.expect_no_cmd().await;
567        handle.announce_peer_connected();
568        handle.expect_no_cmd().await;
569
570        let mut edses = Vec::new();
571        let mut headers = Vec::new();
572
573        for _ in 0..20 {
574            let eds = generate_dummy_eds(2, AppVersion::V2);
575            let dah = DataAvailabilityHeader::from_eds(&eds);
576            let header = gen.next_with_dah(dah);
577
578            edses.push(eds);
579            headers.push(header);
580        }
581
582        // Insert 5-10 block headers
583        store.insert(headers[4..=9].to_vec()).await.unwrap();
584
585        // Sample block 10
586        handle_get_shwap_cid(&mut handle, 10, &edses[9], false).await;
587
588        // Sample block 9
589        handle_get_shwap_cid(&mut handle, 9, &edses[8], false).await;
590
591        // To avoid race conditions we wait a bit for the block 8 to be scheduled
592        sleep(Duration::from_millis(10)).await;
593
594        // Insert 16-20 block headers
595        store.insert(headers[15..=19].to_vec()).await.unwrap();
596
597        // To avoid race conditions we wait a bit for the new head (block 20) to be scheduled
598        sleep(Duration::from_millis(10)).await;
599
600        // Now daser runs two concurrent data sampling: block 8 and block 20
601        handle_concurrent_get_shwap_cid(
602            &mut handle,
603            [(8, &edses[9], false), (20, &edses[19], false)],
604        )
605        .await;
606
607        // Sample and reject block 19
608        handle_get_shwap_cid(&mut handle, 19, &edses[18], true).await;
609
610        // Simulate disconnection
611        handle.announce_all_peers_disconnected();
612
613        // Daser may scheduled Block 18 already, so we need to reply to that requests.
614        // For the sake of the test we reply with a bitswap timeout.
615        while let Some(cmd) = handle.try_recv_cmd().await {
616            match cmd {
617                P2pCmd::GetShwapCid { respond_to, .. } => {
618                    let _ = respond_to.send(Err(P2pError::BitswapQueryTimeout));
619                }
620                cmd => panic!("Unexpected command: {cmd:?}"),
621            }
622        }
623
624        // We shouldn't have any other requests from daser because it's in connecting state.
625        handle.expect_no_cmd().await;
626
627        // Simulate that a peer connected
628        handle.announce_peer_connected();
629
630        // Because of disconnection and previous rejection of block 19, daser will resample it
631        handle_get_shwap_cid(&mut handle, 19, &edses[18], false).await;
632
633        // Sample block 16 until 18
634        for height in (16..=18).rev() {
635            let idx = height as usize - 1;
636            handle_get_shwap_cid(&mut handle, height, &edses[idx], false).await;
637        }
638
639        // Sample block 5 until 7
640        for height in (5..=7).rev() {
641            let idx = height as usize - 1;
642            handle_get_shwap_cid(&mut handle, height, &edses[idx], false).await;
643        }
644
645        handle.expect_no_cmd().await;
646
647        // Push block 21 in the store
648        let eds = generate_dummy_eds(2, AppVersion::V2);
649        let dah = DataAvailabilityHeader::from_eds(&eds);
650        let header = gen.next_with_dah(dah);
651        store.insert(header).await.unwrap();
652
653        // Sample block 21
654        handle_get_shwap_cid(&mut handle, 21, &eds, false).await;
655
656        handle.expect_no_cmd().await;
657    }
658
659    async fn gen_and_sample_block(
660        handle: &mut MockP2pHandle,
661        gen: &mut ExtendedHeaderGenerator,
662        store: &InMemoryStore,
663        event_sub: &mut EventSubscriber,
664        square_width: usize,
665        simulate_invalid_sampling: bool,
666    ) {
667        let eds = generate_dummy_eds(square_width, AppVersion::V2);
668        let dah = DataAvailabilityHeader::from_eds(&eds);
669        let header = gen.next_with_dah(dah);
670        let height = header.height().value();
671
672        store.insert(header).await.unwrap();
673
674        let cids = handle_get_shwap_cid(handle, height, &eds, simulate_invalid_sampling).await;
675        handle.expect_no_cmd().await;
676
677        let mut sampling_metadata = store.get_sampling_metadata(height).await.unwrap().unwrap();
678        sampling_metadata.cids.sort();
679
680        if simulate_invalid_sampling {
681            assert_eq!(sampling_metadata.status, SamplingStatus::Rejected);
682        } else {
683            assert_eq!(sampling_metadata.status, SamplingStatus::Accepted);
684        }
685
686        // Check if CIDs we received successfully made it in the store
687        assert_eq!(&sampling_metadata.cids, &cids);
688
689        // Check if we received `SamplingStarted` event
690        let mut remaining_shares = match event_sub.try_recv().unwrap().event {
691            NodeEvent::SamplingStarted {
692                height: ev_height,
693                square_width,
694                shares,
695            } => {
696                assert_eq!(ev_height, height);
697                assert_eq!(square_width, eds.square_width());
698
699                // Make sure the share list matches the CIDs we received
700                let mut cids = shares
701                    .iter()
702                    .map(|(row, col)| sample_cid(*row, *col, height).unwrap())
703                    .collect::<Vec<_>>();
704                cids.sort();
705                assert_eq!(&sampling_metadata.cids, &cids);
706
707                shares.into_iter().collect::<HashSet<_>>()
708            }
709            ev => panic!("Unexpected event: {ev}"),
710        };
711
712        // Check if we received `ShareSamplingResult` for each share
713        for i in 1..=remaining_shares.len() {
714            match event_sub.try_recv().unwrap().event {
715                NodeEvent::ShareSamplingResult {
716                    height: ev_height,
717                    square_width,
718                    row,
719                    column,
720                    accepted,
721                } => {
722                    assert_eq!(ev_height, height);
723                    assert_eq!(square_width, eds.square_width());
724                    assert_eq!(
725                        accepted,
726                        !(simulate_invalid_sampling && i == INVALID_SHARE_REQ_NUM)
727                    );
728                    // Make sure it is in the list and remove it
729                    assert!(remaining_shares.remove(&(row, column)));
730                }
731                ev => panic!("Unexpected event: {ev}"),
732            }
733        }
734
735        assert!(remaining_shares.is_empty());
736
737        // Check if we received `SamplingFinished` for each share
738        match event_sub.try_recv().unwrap().event {
739            NodeEvent::SamplingFinished {
740                height: ev_height,
741                accepted,
742                took,
743            } => {
744                assert_eq!(ev_height, height);
745                assert_eq!(accepted, !simulate_invalid_sampling);
746                assert_ne!(took, Duration::default());
747            }
748            ev => panic!("Unexpected event: {ev}"),
749        }
750
751        assert!(event_sub.try_recv().is_err());
752    }
753
754    /// Responds to get_shwap_cid and returns all CIDs that were requested
755    async fn handle_concurrent_get_shwap_cid<const N: usize>(
756        handle: &mut MockP2pHandle,
757        handling_args: [(u64, &ExtendedDataSquare, bool); N],
758    ) -> Vec<Cid> {
759        struct Info<'a> {
760            eds: &'a ExtendedDataSquare,
761            simulate_invalid_sampling: bool,
762            needed_samples: usize,
763            requests_count: usize,
764        }
765
766        let mut infos = handling_args
767            .into_iter()
768            .map(|(height, eds, simulate_invalid_sampling)| {
769                let square_width = eds.square_width() as usize;
770                let needed_samples = (square_width * square_width).min(MAX_SAMPLES_NEEDED);
771
772                (
773                    height,
774                    Info {
775                        eds,
776                        simulate_invalid_sampling,
777                        needed_samples,
778                        requests_count: 0,
779                    },
780                )
781            })
782            .collect::<HashMap<_, _>>();
783
784        let needed_samples_sum = infos.values().map(|info| info.needed_samples).sum();
785        let mut cids = Vec::with_capacity(needed_samples_sum);
786
787        for _ in 0..needed_samples_sum {
788            let (cid, respond_to) = handle.expect_get_shwap_cid().await;
789            cids.push(cid);
790
791            let sample_id: SampleId = cid.try_into().unwrap();
792            let info = infos
793                .get_mut(&sample_id.block_height())
794                .unwrap_or_else(|| panic!("Unexpected height: {}", sample_id.block_height()));
795
796            info.requests_count += 1;
797
798            // Simulate invalid sample by triggering BitswapQueryTimeout
799            if info.simulate_invalid_sampling && info.requests_count == INVALID_SHARE_REQ_NUM {
800                respond_to.send(Err(P2pError::BitswapQueryTimeout)).unwrap();
801                continue;
802            }
803
804            let sample = gen_sample_of_cid(sample_id, info.eds).await;
805            respond_to.send(Ok(sample)).unwrap();
806        }
807
808        cids.sort();
809        cids
810    }
811
812    /// Responds to get_shwap_cid and returns all CIDs that were requested
813    async fn handle_get_shwap_cid(
814        handle: &mut MockP2pHandle,
815        height: u64,
816        eds: &ExtendedDataSquare,
817        simulate_invalid_sampling: bool,
818    ) -> Vec<Cid> {
819        handle_concurrent_get_shwap_cid(handle, [(height, eds, simulate_invalid_sampling)]).await
820    }
821
822    async fn gen_sample_of_cid(sample_id: SampleId, eds: &ExtendedDataSquare) -> Vec<u8> {
823        let sample = Sample::new(
824            sample_id.row_index(),
825            sample_id.column_index(),
826            AxisType::Row,
827            eds,
828        )
829        .unwrap();
830
831        let mut container = BytesMut::new();
832        sample.encode(&mut container);
833
834        let block = Block {
835            cid: convert_cid(&sample_id.into()).unwrap().to_bytes(),
836            container: container.to_vec(),
837        };
838
839        block.encode_to_vec()
840    }
841}