1use std::collections::HashSet;
5use std::sync::Arc;
6use std::time::Duration;
7
8use futures::future::BoxFuture;
9use futures::stream::FuturesUnordered;
10use futures::{FutureExt, StreamExt};
11use lumina_utils::executor::{spawn, JoinHandle};
12use lumina_utils::time::{Instant, Interval};
13use rand::Rng;
14use tendermint::Time;
15use tokio::select;
16use tokio::sync::{mpsc, oneshot};
17use tokio_util::sync::CancellationToken;
18use tracing::{debug, error, info, instrument, warn};
19
20use crate::events::{EventPublisher, NodeEvent};
21use crate::p2p::shwap::sample_cid;
22use crate::p2p::{P2p, P2pError};
23use crate::store::{BlockRanges, Store, StoreError};
24use crate::utils::{OneshotSenderExt, TimeExt};
25
26const MAX_SAMPLES_NEEDED: usize = 16;
27const GET_SAMPLE_MIN_TIMEOUT: Duration = Duration::from_secs(10);
28const PRUNER_THRESHOLD: u64 = 512;
29
30pub(crate) const DEFAULT_CONCURENCY_LIMIT: usize = 3;
31pub(crate) const DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY: usize = 5;
32
33type Result<T, E = DaserError> = std::result::Result<T, E>;
34
35#[derive(Debug, thiserror::Error)]
37pub enum DaserError {
38 #[error("P2p: {0}")]
40 P2p(#[from] P2pError),
41
42 #[error("Store: {0}")]
44 Store(#[from] StoreError),
45
46 #[error("Worker died")]
48 WorkerDied,
49
50 #[error("Channel closed unexpectedly")]
52 ChannelClosedUnexpectedly,
53}
54
55impl From<oneshot::error::RecvError> for DaserError {
56 fn from(_value: oneshot::error::RecvError) -> Self {
57 DaserError::ChannelClosedUnexpectedly
58 }
59}
60
61pub(crate) struct Daser {
63 cmd_tx: mpsc::Sender<DaserCmd>,
64 cancellation_token: CancellationToken,
65 join_handle: JoinHandle,
66}
67
68pub(crate) struct DaserArgs<S>
70where
71 S: Store,
72{
73 pub(crate) p2p: Arc<P2p>,
75 pub(crate) store: Arc<S>,
77 pub(crate) event_pub: EventPublisher,
79 pub(crate) sampling_window: Duration,
81 pub(crate) concurrency_limit: usize,
83 pub(crate) additional_headersub_concurrency: usize,
85}
86
87#[derive(Debug)]
88pub(crate) enum DaserCmd {
89 UpdateHighestPrunableHeight {
90 value: u64,
91 },
92
93 UpdateNumberOfPrunableBlocks {
94 value: u64,
95 },
96
97 WantToPrune {
114 height: u64,
115 respond_to: oneshot::Sender<bool>,
116 },
117}
118
119impl Daser {
120 pub(crate) fn start<S>(args: DaserArgs<S>) -> Result<Self>
122 where
123 S: Store + 'static,
124 {
125 let cancellation_token = CancellationToken::new();
126 let event_pub = args.event_pub.clone();
127 let (cmd_tx, cmd_rx) = mpsc::channel(16);
128 let mut worker = Worker::new(args, cancellation_token.child_token(), cmd_rx)?;
129
130 let join_handle = spawn(async move {
131 if let Err(e) = worker.run().await {
132 error!("Daser stopped because of a fatal error: {e}");
133
134 event_pub.send(NodeEvent::FatalDaserError {
135 error: e.to_string(),
136 });
137 }
138 });
139
140 Ok(Daser {
141 cmd_tx,
142 cancellation_token,
143 join_handle,
144 })
145 }
146
147 #[cfg(test)]
148 pub(crate) fn mocked() -> (Self, crate::test_utils::MockDaserHandle) {
149 let (cmd_tx, cmd_rx) = mpsc::channel(16);
150 let cancellation_token = CancellationToken::new();
151
152 let join_handle = spawn(async {});
154
155 let daser = Daser {
156 cmd_tx,
157 cancellation_token,
158 join_handle,
159 };
160
161 let mock_handle = crate::test_utils::MockDaserHandle { cmd_rx };
162
163 (daser, mock_handle)
164 }
165
166 pub(crate) fn stop(&self) {
168 self.cancellation_token.cancel();
170 }
171
172 pub(crate) async fn join(&self) {
174 self.join_handle.join().await;
175 }
176
177 async fn send_command(&self, cmd: DaserCmd) -> Result<()> {
178 self.cmd_tx
179 .send(cmd)
180 .await
181 .map_err(|_| DaserError::WorkerDied)
182 }
183
184 pub(crate) async fn want_to_prune(&self, height: u64) -> Result<bool> {
185 let (tx, rx) = oneshot::channel();
186
187 self.send_command(DaserCmd::WantToPrune {
188 height,
189 respond_to: tx,
190 })
191 .await?;
192
193 Ok(rx.await?)
194 }
195
196 pub(crate) async fn update_highest_prunable_block(&self, value: u64) -> Result<()> {
197 self.send_command(DaserCmd::UpdateHighestPrunableHeight { value })
198 .await
199 }
200
201 pub(crate) async fn update_number_of_prunable_blocks(&self, value: u64) -> Result<()> {
202 self.send_command(DaserCmd::UpdateNumberOfPrunableBlocks { value })
203 .await
204 }
205}
206
207impl Drop for Daser {
208 fn drop(&mut self) {
209 self.stop();
210 }
211}
212
213struct Worker<S>
214where
215 S: Store + 'static,
216{
217 cmd_rx: mpsc::Receiver<DaserCmd>,
218 cancellation_token: CancellationToken,
219 event_pub: EventPublisher,
220 p2p: Arc<P2p>,
221 store: Arc<S>,
222 max_samples_needed: usize,
223 sampling_futs: FuturesUnordered<BoxFuture<'static, Result<(u64, bool)>>>,
224 queue: BlockRanges,
225 timed_out: BlockRanges,
226 ongoing: BlockRanges,
227 will_be_pruned: BlockRanges,
228 sampling_window: Duration,
229 concurrency_limit: usize,
230 additional_headersub_concurency: usize,
231 head_height: Option<u64>,
232 highest_prunable_height: Option<u64>,
233 num_of_prunable_blocks: u64,
234}
235
236impl<S> Worker<S>
237where
238 S: Store,
239{
240 fn new(
241 args: DaserArgs<S>,
242 cancellation_token: CancellationToken,
243 cmd_rx: mpsc::Receiver<DaserCmd>,
244 ) -> Result<Worker<S>> {
245 Ok(Worker {
246 cmd_rx,
247 cancellation_token,
248 event_pub: args.event_pub,
249 p2p: args.p2p,
250 store: args.store,
251 max_samples_needed: MAX_SAMPLES_NEEDED,
252 sampling_futs: FuturesUnordered::new(),
253 queue: BlockRanges::default(),
254 timed_out: BlockRanges::default(),
255 ongoing: BlockRanges::default(),
256 will_be_pruned: BlockRanges::default(),
257 sampling_window: args.sampling_window,
258 concurrency_limit: args.concurrency_limit,
259 additional_headersub_concurency: args.additional_headersub_concurrency,
260 head_height: None,
261 highest_prunable_height: None,
262 num_of_prunable_blocks: 0,
263 })
264 }
265
266 async fn run(&mut self) -> Result<()> {
267 loop {
268 if self.cancellation_token.is_cancelled() {
269 break;
270 }
271
272 self.connecting_event_loop().await;
273
274 if self.cancellation_token.is_cancelled() {
275 break;
276 }
277
278 self.connected_event_loop().await?;
279 }
280
281 debug!("Daser stopped");
282 Ok(())
283 }
284
285 async fn connecting_event_loop(&mut self) {
286 debug!("Entering connecting_event_loop");
287
288 let mut peer_tracker_info_watcher = self.p2p.peer_tracker_info_watcher();
289
290 if peer_tracker_info_watcher.borrow().num_connected_peers > 0 {
292 return;
293 }
294
295 loop {
296 select! {
297 _ = self.cancellation_token.cancelled() => {
298 break;
299 }
300 _ = peer_tracker_info_watcher.changed() => {
301 if peer_tracker_info_watcher.borrow().num_connected_peers > 0 {
302 break;
303 }
304 }
305 Some(cmd) = self.cmd_rx.recv() => self.on_cmd(cmd).await,
306 }
307 }
308 }
309
310 async fn connected_event_loop(&mut self) -> Result<()> {
311 debug!("Entering connected_event_loop");
312
313 let mut report_interval = Interval::new(Duration::from_secs(60)).await;
314 let mut peer_tracker_info_watcher = self.p2p.peer_tracker_info_watcher();
315
316 if peer_tracker_info_watcher.borrow().num_connected_peers == 0 {
318 warn!("All peers disconnected");
319 return Ok(());
320 }
321
322 let store = self.store.clone();
327 let mut wait_new_head = store.wait_new_head();
328
329 self.update_queue().await?;
330
331 let mut first_report = true;
332
333 loop {
334 while self.schedule_next_sample_block().await? {}
336
337 if first_report {
338 self.report().await?;
339 first_report = false;
340 }
341
342 select! {
343 _ = self.cancellation_token.cancelled() => {
344 break;
345 }
346 _ = peer_tracker_info_watcher.changed() => {
347 if peer_tracker_info_watcher.borrow().num_connected_peers == 0 {
348 warn!("All peers disconnected");
349 break;
350 }
351 }
352 _ = report_interval.tick() => self.report().await?,
353 Some(cmd) = self.cmd_rx.recv() => self.on_cmd(cmd).await,
354 Some(res) = self.sampling_futs.next() => {
355 let (height, timed_out) = res?;
358
359 if timed_out {
360 self.timed_out.insert_relaxed(height..=height).expect("invalid height");
361 } else {
362 self.store.mark_as_sampled(height).await?;
363 }
364
365 self.ongoing.remove_relaxed(height..=height).expect("invalid height");
366 },
367 _ = &mut wait_new_head => {
368 wait_new_head = store.wait_new_head();
369 self.update_queue().await?;
370 }
371 }
372 }
373
374 self.sampling_futs.clear();
375 self.queue = BlockRanges::default();
376 self.ongoing = BlockRanges::default();
377 self.timed_out = BlockRanges::default();
378 self.head_height = None;
379
380 Ok(())
381 }
382
383 #[instrument(skip_all)]
384 async fn report(&mut self) -> Result<()> {
385 let sampled = self.store.get_sampled_ranges().await?;
386
387 info!(
388 "data sampling: stored and sampled blocks: {}, ongoing blocks: {}",
389 sampled, &self.ongoing,
390 );
391
392 Ok(())
393 }
394
395 async fn on_cmd(&mut self, cmd: DaserCmd) {
396 match cmd {
397 DaserCmd::WantToPrune { height, respond_to } => {
398 let res = self.on_want_to_prune(height).await;
399 respond_to.maybe_send(res);
400 }
401 DaserCmd::UpdateHighestPrunableHeight { value } => {
402 self.highest_prunable_height = Some(value);
403 }
404 DaserCmd::UpdateNumberOfPrunableBlocks { value } => {
405 self.num_of_prunable_blocks = value;
406 }
407 }
408 }
409
410 async fn on_want_to_prune(&mut self, height: u64) -> bool {
411 if self.ongoing.contains(height) {
413 return false;
414 }
415
416 self.queue
418 .remove_relaxed(height..=height)
419 .expect("invalid height");
420 self.will_be_pruned
422 .insert_relaxed(height..=height)
423 .expect("invalid height");
424
425 true
426 }
427
428 async fn schedule_next_sample_block(&mut self) -> Result<bool> {
429 let header = loop {
431 let Some(height) = self.queue.pop_head() else {
432 return Ok(false);
433 };
434
435 let concurrency_limit = if height <= self.highest_prunable_height.unwrap_or(0)
436 && self.num_of_prunable_blocks >= PRUNER_THRESHOLD
437 {
438 0
441 } else if height == self.head_height.unwrap_or(0) {
442 self.concurrency_limit + self.additional_headersub_concurency
444 } else {
445 self.concurrency_limit
446 };
447
448 if self.sampling_futs.len() >= concurrency_limit {
449 self.queue
452 .insert_relaxed(height..=height)
453 .expect("invalid height");
454 return Ok(false);
455 }
456
457 match self.store.get_by_height(height).await {
458 Ok(header) => break header,
459 Err(StoreError::NotFound) => {
460 self.update_queue().await?;
463 }
464 Err(e) => return Err(e.into()),
465 }
466 };
467
468 let height = header.height().value();
469 let square_width = header.dah.square_width();
470
471 if !self.in_sampling_window(header.time()) {
473 self.queue
476 .remove_relaxed(1..=height)
477 .expect("invalid height");
478 self.timed_out
479 .insert_relaxed(1..=height)
480 .expect("invalid height");
481 return Ok(false);
482 }
483
484 let share_indexes = random_indexes(square_width, self.max_samples_needed);
486
487 let cids = share_indexes
490 .iter()
491 .map(|(row, col)| sample_cid(*row, *col, height))
492 .collect::<Result<Vec<_>, _>>()?;
493 self.store.update_sampling_metadata(height, cids).await?;
494
495 let p2p = self.p2p.clone();
496 let event_pub = self.event_pub.clone();
497 let sampling_window = self.sampling_window;
498
499 let fut = async move {
501 let now = Instant::now();
502
503 event_pub.send(NodeEvent::SamplingStarted {
504 height,
505 square_width,
506 shares: share_indexes.iter().copied().collect(),
507 });
508
509 let timeout = calc_timeout(header.time(), Time::now(), sampling_window);
511
512 let mut futs = share_indexes
514 .into_iter()
515 .map(|(row, col)| {
516 let p2p = p2p.clone();
517
518 async move {
519 let res = p2p.get_sample(row, col, height, Some(timeout)).await;
520 (row, col, res)
521 }
522 })
523 .collect::<FuturesUnordered<_>>();
524
525 let mut sampling_timed_out = false;
526
527 while let Some((row, column, res)) = futs.next().await {
529 let timed_out = match res {
530 Ok(_) => false,
531 Err(P2pError::BitswapQueryTimeout) => true,
536 Err(e) => return Err(e.into()),
537 };
538
539 if timed_out {
540 sampling_timed_out = true;
541 }
542
543 event_pub.send(NodeEvent::ShareSamplingResult {
544 height,
545 square_width,
546 row,
547 column,
548 timed_out,
549 });
550 }
551
552 event_pub.send(NodeEvent::SamplingResult {
553 height,
554 timed_out: sampling_timed_out,
555 took: now.elapsed(),
556 });
557
558 Ok((height, sampling_timed_out))
559 }
560 .boxed();
561
562 self.sampling_futs.push(fut);
563 self.ongoing
564 .insert_relaxed(height..=height)
565 .expect("invalid height");
566
567 Ok(true)
568 }
569
570 async fn update_queue(&mut self) -> Result<()> {
572 let stored = self.store.get_stored_header_ranges().await?;
573 let sampled = self.store.get_sampled_ranges().await?;
574
575 self.head_height = stored.head();
576 self.queue = stored - &sampled - &self.timed_out - &self.ongoing - &self.will_be_pruned;
577
578 Ok(())
579 }
580
581 fn in_sampling_window(&self, time: Time) -> bool {
583 let now = Time::now();
584
585 if now < time {
587 return true;
588 }
589
590 let Ok(age) = now.duration_since(time) else {
591 return false;
592 };
593
594 age <= self.sampling_window
595 }
596}
597
598fn calc_timeout(header_time: Time, now: Time, sampling_window: Duration) -> Duration {
599 let sampling_window_end = now.saturating_sub(sampling_window);
600
601 header_time
602 .duration_since(sampling_window_end)
603 .unwrap_or(GET_SAMPLE_MIN_TIMEOUT)
604 .max(GET_SAMPLE_MIN_TIMEOUT)
605}
606
607fn random_indexes(square_width: u16, max_samples_needed: usize) -> HashSet<(u16, u16)> {
609 let samples_in_block = usize::from(square_width).pow(2);
610
611 if samples_in_block <= max_samples_needed {
614 return (0..square_width)
615 .flat_map(|row| (0..square_width).map(move |col| (row, col)))
616 .collect();
617 }
618
619 let mut indexes = HashSet::with_capacity(max_samples_needed);
620 let mut rng = rand::thread_rng();
621
622 while indexes.len() < max_samples_needed {
623 let row = rng.gen::<u16>() % square_width;
624 let col = rng.gen::<u16>() % square_width;
625 indexes.insert((row, col));
626 }
627
628 indexes
629}
630
631#[cfg(test)]
632mod tests {
633 use super::*;
634 use crate::events::{EventChannel, EventSubscriber};
635 use crate::node::SAMPLING_WINDOW;
636 use crate::p2p::shwap::convert_cid;
637 use crate::p2p::P2pCmd;
638 use crate::store::InMemoryStore;
639 use crate::test_utils::{ExtendedHeaderGeneratorExt, MockP2pHandle};
640 use crate::utils::OneshotResultSender;
641 use bytes::BytesMut;
642 use celestia_proto::bitswap::Block;
643 use celestia_types::consts::appconsts::AppVersion;
644 use celestia_types::sample::{Sample, SampleId};
645 use celestia_types::test_utils::{generate_dummy_eds, ExtendedHeaderGenerator};
646 use celestia_types::{AxisType, DataAvailabilityHeader, ExtendedDataSquare};
647 use cid::Cid;
648 use lumina_utils::test_utils::async_test;
649 use lumina_utils::time::sleep;
650 use prost::Message;
651 use std::collections::HashMap;
652 use std::time::Duration;
653
654 const REQ_TIMEOUT_SHARE_NUM: usize = 2;
658
659 #[async_test]
660 async fn check_calc_timeout() {
661 let now = Time::now();
662 let sampling_window = Duration::from_secs(60);
663
664 let header_time = now;
665 assert_eq!(
666 calc_timeout(header_time, now, sampling_window),
667 Duration::from_secs(60)
668 );
669
670 let header_time = now.checked_sub(Duration::from_secs(1)).unwrap();
671 assert_eq!(
672 calc_timeout(header_time, now, sampling_window),
673 Duration::from_secs(59)
674 );
675
676 let header_time = now.checked_sub(Duration::from_secs(49)).unwrap();
677 assert_eq!(
678 calc_timeout(header_time, now, sampling_window),
679 Duration::from_secs(11)
680 );
681
682 let header_time = now.checked_sub(Duration::from_secs(50)).unwrap();
683 assert_eq!(
684 calc_timeout(header_time, now, sampling_window),
685 Duration::from_secs(10)
686 );
687
688 let header_time = now.checked_sub(Duration::from_secs(51)).unwrap();
690 assert_eq!(
691 calc_timeout(header_time, now, sampling_window),
692 GET_SAMPLE_MIN_TIMEOUT
693 );
694
695 let header_time = now.checked_sub(Duration::from_secs(60)).unwrap();
697 assert_eq!(
698 calc_timeout(header_time, now, sampling_window),
699 GET_SAMPLE_MIN_TIMEOUT
700 );
701
702 let header_time = now.checked_sub(Duration::from_secs(61)).unwrap();
704 assert_eq!(
705 calc_timeout(header_time, now, sampling_window),
706 GET_SAMPLE_MIN_TIMEOUT
707 );
708
709 let header_time = now.checked_add(Duration::from_secs(1)).unwrap();
711 assert_eq!(
712 calc_timeout(header_time, now, sampling_window),
713 Duration::from_secs(61)
714 );
715 }
716
717 #[async_test]
718 async fn received_valid_samples() {
719 let (mock, mut handle) = P2p::mocked();
720 let store = Arc::new(InMemoryStore::new());
721 let events = EventChannel::new();
722 let mut event_sub = events.subscribe();
723
724 let _daser = Daser::start(DaserArgs {
725 event_pub: events.publisher(),
726 p2p: Arc::new(mock),
727 store: store.clone(),
728 sampling_window: SAMPLING_WINDOW,
729 concurrency_limit: 1,
730 additional_headersub_concurrency: DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY,
731 })
732 .unwrap();
733
734 let mut gen = ExtendedHeaderGenerator::new();
735
736 handle.expect_no_cmd().await;
737 handle.announce_peer_connected();
738 handle.expect_no_cmd().await;
739
740 gen_and_sample_block(&mut handle, &mut gen, &store, &mut event_sub, 2, false).await;
741 gen_and_sample_block(&mut handle, &mut gen, &store, &mut event_sub, 4, false).await;
742 gen_and_sample_block(&mut handle, &mut gen, &store, &mut event_sub, 8, false).await;
743 gen_and_sample_block(&mut handle, &mut gen, &store, &mut event_sub, 16, false).await;
744 }
745
746 #[async_test]
747 async fn sampling_timeout() {
748 let (mock, mut handle) = P2p::mocked();
749 let store = Arc::new(InMemoryStore::new());
750 let events = EventChannel::new();
751 let mut event_sub = events.subscribe();
752
753 let _daser = Daser::start(DaserArgs {
754 event_pub: events.publisher(),
755 p2p: Arc::new(mock),
756 store: store.clone(),
757 sampling_window: SAMPLING_WINDOW,
758 concurrency_limit: 1,
759 additional_headersub_concurrency: DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY,
760 })
761 .unwrap();
762
763 let mut gen = ExtendedHeaderGenerator::new();
764
765 handle.expect_no_cmd().await;
766 handle.announce_peer_connected();
767 handle.expect_no_cmd().await;
768
769 gen_and_sample_block(&mut handle, &mut gen, &store, &mut event_sub, 2, false).await;
770 gen_and_sample_block(&mut handle, &mut gen, &store, &mut event_sub, 4, true).await;
771 gen_and_sample_block(&mut handle, &mut gen, &store, &mut event_sub, 8, false).await;
772 }
773
774 #[async_test]
775 async fn backward_dasing() {
776 let (mock, mut handle) = P2p::mocked();
777 let store = Arc::new(InMemoryStore::new());
778 let events = EventChannel::new();
779
780 let _daser = Daser::start(DaserArgs {
781 event_pub: events.publisher(),
782 p2p: Arc::new(mock),
783 store: store.clone(),
784 sampling_window: SAMPLING_WINDOW,
785 concurrency_limit: 1,
786 additional_headersub_concurrency: DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY,
787 })
788 .unwrap();
789
790 let mut gen = ExtendedHeaderGenerator::new();
791
792 handle.expect_no_cmd().await;
793 handle.announce_peer_connected();
794 handle.expect_no_cmd().await;
795
796 let mut edses = Vec::new();
797 let mut headers = Vec::new();
798
799 for _ in 0..20 {
800 let eds = generate_dummy_eds(2, AppVersion::V2);
801 let dah = DataAvailabilityHeader::from_eds(&eds);
802 let header = gen.next_with_dah(dah);
803
804 edses.push(eds);
805 headers.push(header);
806 }
807
808 store.insert(headers[4..=9].to_vec()).await.unwrap();
810
811 handle_get_shwap_cid(&mut handle, 10, &edses[9], false).await;
813
814 handle_get_shwap_cid(&mut handle, 9, &edses[8], false).await;
816
817 sleep(Duration::from_millis(10)).await;
819
820 store.insert(headers[15..=19].to_vec()).await.unwrap();
822
823 sleep(Duration::from_millis(10)).await;
825
826 handle_concurrent_get_shwap_cid(
828 &mut handle,
829 [(8, &edses[9], false), (20, &edses[19], false)],
830 )
831 .await;
832
833 handle_get_shwap_cid(&mut handle, 19, &edses[18], true).await;
835
836 handle.announce_all_peers_disconnected();
838
839 while let Some(cmd) = handle.try_recv_cmd().await {
842 match cmd {
843 P2pCmd::GetShwapCid { respond_to, .. } => {
844 let _ = respond_to.send(Err(P2pError::BitswapQueryTimeout));
845 }
846 cmd => panic!("Unexpected command: {cmd:?}"),
847 }
848 }
849
850 handle.expect_no_cmd().await;
852
853 handle.announce_peer_connected();
855
856 handle_get_shwap_cid(&mut handle, 19, &edses[18], false).await;
858
859 for height in (16..=18).rev() {
861 let idx = height as usize - 1;
862 handle_get_shwap_cid(&mut handle, height, &edses[idx], false).await;
863 }
864
865 for height in (5..=7).rev() {
867 let idx = height as usize - 1;
868 handle_get_shwap_cid(&mut handle, height, &edses[idx], false).await;
869 }
870
871 handle.expect_no_cmd().await;
872
873 let eds = generate_dummy_eds(2, AppVersion::V2);
875 let dah = DataAvailabilityHeader::from_eds(&eds);
876 let header = gen.next_with_dah(dah);
877 store.insert(header).await.unwrap();
878
879 handle_get_shwap_cid(&mut handle, 21, &eds, false).await;
881
882 handle.expect_no_cmd().await;
883 }
884
885 #[async_test]
886 async fn concurrency_limits() {
887 let (mock, mut handle) = P2p::mocked();
888 let store = Arc::new(InMemoryStore::new());
889 let events = EventChannel::new();
890
891 let concurrency_limit = 10;
893 let additional_headersub_concurrency = 5;
896 let shares_per_block = 4;
899
900 let mut gen = ExtendedHeaderGenerator::new();
901 store.insert(gen.next_many_verified(30)).await.unwrap();
902
903 let _daser = Daser::start(DaserArgs {
904 event_pub: events.publisher(),
905 p2p: Arc::new(mock),
906 store: store.clone(),
907 sampling_window: SAMPLING_WINDOW,
908 concurrency_limit,
909 additional_headersub_concurrency,
910 })
911 .unwrap();
912
913 handle.expect_no_cmd().await;
914 handle.announce_peer_connected();
915
916 let mut hold_respond_channels = Vec::new();
917
918 for _ in 0..(concurrency_limit * shares_per_block) {
919 let (cid, respond_to) = handle.expect_get_shwap_cid().await;
920 hold_respond_channels.push((cid, respond_to));
921 }
922
923 handle.expect_no_cmd().await;
925
926 store.insert(gen.next_many_verified(2)).await.unwrap();
928
929 for _ in 0..shares_per_block {
930 let (cid, respond_to) = handle.expect_get_shwap_cid().await;
931 hold_respond_channels.push((cid, respond_to));
932 }
933 handle.expect_no_cmd().await;
934
935 stop_sampling_for(&mut hold_respond_channels, 29);
939 stop_sampling_for(&mut hold_respond_channels, 30);
940
941 for _ in 0..shares_per_block {
943 let (cid, respond_to) = handle.expect_get_shwap_cid().await;
944 hold_respond_channels.push((cid, respond_to));
945 }
946
947 handle.expect_no_cmd().await;
949
950 for _ in 0..additional_headersub_concurrency {
952 store.insert(gen.next_many_verified(1)).await.unwrap();
953 sleep(Duration::from_millis(10)).await;
955 }
956
957 for _ in 0..(additional_headersub_concurrency * shares_per_block) {
958 let (cid, respond_to) = handle.expect_get_shwap_cid().await;
959 hold_respond_channels.push((cid, respond_to));
960 }
961
962 handle.expect_no_cmd().await;
964 store.insert(gen.next_many_verified(1)).await.unwrap();
965 handle.expect_no_cmd().await;
966
967 stop_sampling_for(&mut hold_respond_channels, 28);
970
971 for _ in 0..shares_per_block {
972 let (cid, respond_to) = handle.expect_get_shwap_cid().await;
973 hold_respond_channels.push((cid, respond_to));
974 }
975
976 handle.expect_no_cmd().await;
978 store.insert(gen.next_many_verified(1)).await.unwrap();
979 handle.expect_no_cmd().await;
980 }
981
982 #[async_test]
983 async fn ratelimit() {
984 let (mock, mut handle) = P2p::mocked();
985 let store = Arc::new(InMemoryStore::new());
986 let events = EventChannel::new();
987 let mut event_sub = events.subscribe();
988
989 let daser = Daser::start(DaserArgs {
990 event_pub: events.publisher(),
991 p2p: Arc::new(mock),
992 store: store.clone(),
993 sampling_window: Duration::from_secs(60),
994 concurrency_limit: 1,
995 additional_headersub_concurrency: DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY,
996 })
997 .unwrap();
998
999 let mut gen = ExtendedHeaderGenerator::new();
1000
1001 let now = Time::now();
1002 let first_header_time = (now - Duration::from_secs(1024)).unwrap();
1003 gen.set_time(first_header_time, Duration::from_secs(1));
1004 store.insert(gen.next_many(990)).await.unwrap();
1005
1006 let mut edses = HashMap::new();
1007
1008 for height in 991..=1000 {
1009 let eds = generate_dummy_eds(2, AppVersion::V2);
1010 let dah = DataAvailabilityHeader::from_eds(&eds);
1011 let header = gen.next_with_dah(dah);
1012
1013 edses.insert(height, eds);
1014 store.insert(header).await.unwrap();
1015 }
1016
1017 daser.update_highest_prunable_block(1000).await.unwrap();
1019 daser.update_number_of_prunable_blocks(1000).await.unwrap();
1020
1021 handle.expect_no_cmd().await;
1022 handle.announce_peer_connected();
1023
1024 handle.expect_no_cmd().await;
1026
1027 gen_and_sample_block(&mut handle, &mut gen, &store, &mut event_sub, 2, false).await;
1029 gen_and_sample_block(&mut handle, &mut gen, &store, &mut event_sub, 2, false).await;
1030
1031 handle.expect_no_cmd().await;
1033
1034 daser
1036 .update_number_of_prunable_blocks(PRUNER_THRESHOLD - 1)
1037 .await
1038 .unwrap();
1039 sleep(Duration::from_millis(5)).await;
1040
1041 sample_block(
1043 &mut handle,
1044 &store,
1045 &mut event_sub,
1046 edses.get(&1000).unwrap(),
1047 1000,
1048 false,
1049 )
1050 .await;
1051 sample_block(
1052 &mut handle,
1053 &store,
1054 &mut event_sub,
1055 edses.get(&999).unwrap(),
1056 999,
1057 false,
1058 )
1059 .await;
1060
1061 daser
1063 .update_number_of_prunable_blocks(PRUNER_THRESHOLD)
1064 .await
1065 .unwrap();
1066 sleep(Duration::from_millis(5)).await;
1067
1068 sample_block(
1070 &mut handle,
1071 &store,
1072 &mut event_sub,
1073 edses.get(&998).unwrap(),
1074 998,
1075 false,
1076 )
1077 .await;
1078
1079 handle.expect_no_cmd().await;
1081 }
1082
1083 fn stop_sampling_for(
1084 responders: &mut Vec<(Cid, OneshotResultSender<Vec<u8>, P2pError>)>,
1085 height: u64,
1086 ) {
1087 let mut indexes = Vec::new();
1088
1089 for (idx, (cid, _)) in responders.iter().enumerate() {
1090 let sample_id: SampleId = cid.try_into().unwrap();
1091 if sample_id.block_height() == height {
1092 indexes.push(idx)
1093 }
1094 }
1095
1096 for idx in indexes.into_iter().rev() {
1097 let (_cid, respond_to) = responders.remove(idx);
1098 respond_to.send(Err(P2pError::BitswapQueryTimeout)).unwrap();
1099 }
1100 }
1101
1102 async fn gen_and_sample_block(
1103 handle: &mut MockP2pHandle,
1104 gen: &mut ExtendedHeaderGenerator,
1105 store: &InMemoryStore,
1106 event_sub: &mut EventSubscriber,
1107 square_width: usize,
1108 simulate_sampling_timeout: bool,
1109 ) {
1110 let eds = generate_dummy_eds(square_width, AppVersion::V2);
1111 let dah = DataAvailabilityHeader::from_eds(&eds);
1112 let header = gen.next_with_dah(dah);
1113 let height = header.height().value();
1114
1115 store.insert(header).await.unwrap();
1116
1117 sample_block(
1118 handle,
1119 store,
1120 event_sub,
1121 &eds,
1122 height,
1123 simulate_sampling_timeout,
1124 )
1125 .await;
1126
1127 handle.expect_no_cmd().await;
1130 assert!(event_sub.try_recv().is_err());
1131 }
1132
1133 async fn sample_block(
1134 handle: &mut MockP2pHandle,
1135 store: &InMemoryStore,
1136 event_sub: &mut EventSubscriber,
1137 eds: &ExtendedDataSquare,
1138 height: u64,
1139 simulate_sampling_timeout: bool,
1140 ) {
1141 let cids = handle_get_shwap_cid(handle, height, eds, simulate_sampling_timeout).await;
1142
1143 sleep(Duration::from_millis(100)).await;
1145
1146 let sampled_ranges = store.get_sampled_ranges().await.unwrap();
1148 assert_eq!(sampled_ranges.contains(height), !simulate_sampling_timeout);
1149
1150 let mut sampling_metadata = store.get_sampling_metadata(height).await.unwrap().unwrap();
1152 sampling_metadata.cids.sort();
1153 assert_eq!(&sampling_metadata.cids, &cids);
1154
1155 let mut remaining_shares = match event_sub.try_recv().unwrap().event {
1157 NodeEvent::SamplingStarted {
1158 height: ev_height,
1159 square_width,
1160 shares,
1161 } => {
1162 assert_eq!(ev_height, height);
1163 assert_eq!(square_width, eds.square_width());
1164
1165 let mut cids = shares
1167 .iter()
1168 .map(|(row, col)| sample_cid(*row, *col, height).unwrap())
1169 .collect::<Vec<_>>();
1170 cids.sort();
1171 assert_eq!(&sampling_metadata.cids, &cids);
1172
1173 shares.into_iter().collect::<HashSet<_>>()
1174 }
1175 ev => panic!("Unexpected event: {ev}"),
1176 };
1177
1178 for i in 1..=remaining_shares.len() {
1180 match event_sub.try_recv().unwrap().event {
1181 NodeEvent::ShareSamplingResult {
1182 height: ev_height,
1183 square_width,
1184 row,
1185 column,
1186 timed_out,
1187 } => {
1188 assert_eq!(ev_height, height);
1189 assert_eq!(square_width, eds.square_width());
1190 assert_eq!(
1191 timed_out,
1192 simulate_sampling_timeout && i == REQ_TIMEOUT_SHARE_NUM
1193 );
1194 assert!(remaining_shares.remove(&(row, column)));
1196 }
1197 ev => panic!("Unexpected event: {ev}"),
1198 }
1199 }
1200
1201 assert!(remaining_shares.is_empty());
1202
1203 match event_sub.try_recv().unwrap().event {
1205 NodeEvent::SamplingResult {
1206 height: ev_height,
1207 timed_out,
1208 took,
1209 } => {
1210 assert_eq!(ev_height, height);
1211 assert_eq!(timed_out, simulate_sampling_timeout);
1212 assert_ne!(took, Duration::default());
1213 }
1214 ev => panic!("Unexpected event: {ev}"),
1215 }
1216 }
1217
1218 async fn handle_concurrent_get_shwap_cid<const N: usize>(
1220 handle: &mut MockP2pHandle,
1221 handling_args: [(u64, &ExtendedDataSquare, bool); N],
1222 ) -> Vec<Cid> {
1223 struct Info<'a> {
1224 eds: &'a ExtendedDataSquare,
1225 simulate_sampling_timeout: bool,
1226 needed_samples: usize,
1227 requests_count: usize,
1228 }
1229
1230 let mut infos = handling_args
1231 .into_iter()
1232 .map(|(height, eds, simulate_sampling_timeout)| {
1233 let square_width = eds.square_width() as usize;
1234 let needed_samples = (square_width * square_width).min(MAX_SAMPLES_NEEDED);
1235
1236 (
1237 height,
1238 Info {
1239 eds,
1240 simulate_sampling_timeout,
1241 needed_samples,
1242 requests_count: 0,
1243 },
1244 )
1245 })
1246 .collect::<HashMap<_, _>>();
1247
1248 let needed_samples_sum = infos.values().map(|info| info.needed_samples).sum();
1249 let mut cids = Vec::with_capacity(needed_samples_sum);
1250
1251 for _ in 0..needed_samples_sum {
1252 let (cid, respond_to) = handle.expect_get_shwap_cid().await;
1253 cids.push(cid);
1254
1255 let sample_id: SampleId = cid.try_into().unwrap();
1256 let info = infos
1257 .get_mut(&sample_id.block_height())
1258 .unwrap_or_else(|| panic!("Unexpected height: {}", sample_id.block_height()));
1259
1260 info.requests_count += 1;
1261
1262 if info.simulate_sampling_timeout && info.requests_count == REQ_TIMEOUT_SHARE_NUM {
1264 respond_to.send(Err(P2pError::BitswapQueryTimeout)).unwrap();
1265 continue;
1266 }
1267
1268 let sample = gen_sample_of_cid(sample_id, info.eds).await;
1269 respond_to.send(Ok(sample)).unwrap();
1270 }
1271
1272 cids.sort();
1273 cids
1274 }
1275
1276 async fn handle_get_shwap_cid(
1278 handle: &mut MockP2pHandle,
1279 height: u64,
1280 eds: &ExtendedDataSquare,
1281 simulate_sampling_timeout: bool,
1282 ) -> Vec<Cid> {
1283 handle_concurrent_get_shwap_cid(handle, [(height, eds, simulate_sampling_timeout)]).await
1284 }
1285
1286 async fn gen_sample_of_cid(sample_id: SampleId, eds: &ExtendedDataSquare) -> Vec<u8> {
1287 let sample = Sample::new(
1288 sample_id.row_index(),
1289 sample_id.column_index(),
1290 AxisType::Row,
1291 eds,
1292 )
1293 .unwrap();
1294
1295 let mut container = BytesMut::new();
1296 sample.encode(&mut container);
1297
1298 let block = Block {
1299 cid: convert_cid(&sample_id.into()).unwrap().to_bytes(),
1300 container: container.to_vec(),
1301 };
1302
1303 block.encode_to_vec()
1304 }
1305}