1use std::collections::HashSet;
5use std::sync::Arc;
6use std::time::Duration;
7
8use futures::future::BoxFuture;
9use futures::stream::FuturesUnordered;
10use futures::{FutureExt, StreamExt};
11use lumina_utils::executor::{JoinHandle, spawn};
12use lumina_utils::time::{Instant, Interval};
13use rand::Rng;
14use tendermint::Time;
15use tokio::select;
16use tokio::sync::{mpsc, oneshot};
17use tokio_util::sync::CancellationToken;
18use tracing::{debug, error, info, instrument, warn};
19
20use crate::events::{EventPublisher, NodeEvent};
21use crate::p2p::shwap::sample_cid;
22use crate::p2p::{P2p, P2pError};
23use crate::store::{BlockRanges, Store, StoreError};
24use crate::utils::{OneshotSenderExt, TimeExt};
25
26const MAX_SAMPLES_NEEDED: usize = 16;
27const GET_SAMPLE_MIN_TIMEOUT: Duration = Duration::from_secs(10);
28const PRUNER_THRESHOLD: u64 = 512;
29
30pub(crate) const DEFAULT_CONCURENCY_LIMIT: usize = 3;
31pub(crate) const DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY: usize = 5;
32
33type Result<T, E = DaserError> = std::result::Result<T, E>;
34
35#[derive(Debug, thiserror::Error)]
37pub enum DaserError {
38 #[error("P2p: {0}")]
40 P2p(#[from] P2pError),
41
42 #[error("Store: {0}")]
44 Store(#[from] StoreError),
45
46 #[error("Worker died")]
48 WorkerDied,
49
50 #[error("Channel closed unexpectedly")]
52 ChannelClosedUnexpectedly,
53}
54
55impl From<oneshot::error::RecvError> for DaserError {
56 fn from(_value: oneshot::error::RecvError) -> Self {
57 DaserError::ChannelClosedUnexpectedly
58 }
59}
60
61pub(crate) struct Daser {
63 cmd_tx: mpsc::Sender<DaserCmd>,
64 cancellation_token: CancellationToken,
65 join_handle: JoinHandle,
66}
67
68pub(crate) struct DaserArgs<S>
70where
71 S: Store,
72{
73 pub(crate) p2p: Arc<P2p>,
75 pub(crate) store: Arc<S>,
77 pub(crate) event_pub: EventPublisher,
79 pub(crate) sampling_window: Duration,
81 pub(crate) concurrency_limit: usize,
83 pub(crate) additional_headersub_concurrency: usize,
85}
86
87#[derive(Debug)]
88pub(crate) enum DaserCmd {
89 UpdateHighestPrunableHeight {
90 value: u64,
91 },
92
93 UpdateNumberOfPrunableBlocks {
94 value: u64,
95 },
96
97 WantToPrune {
114 height: u64,
115 respond_to: oneshot::Sender<bool>,
116 },
117}
118
119impl Daser {
120 pub(crate) fn start<S>(args: DaserArgs<S>) -> Result<Self>
122 where
123 S: Store + 'static,
124 {
125 let cancellation_token = CancellationToken::new();
126 let event_pub = args.event_pub.clone();
127 let (cmd_tx, cmd_rx) = mpsc::channel(16);
128 let mut worker = Worker::new(args, cancellation_token.child_token(), cmd_rx)?;
129
130 let join_handle = spawn(async move {
131 if let Err(e) = worker.run().await {
132 error!("Daser stopped because of a fatal error: {e}");
133
134 event_pub.send(NodeEvent::FatalDaserError {
135 error: e.to_string(),
136 });
137 }
138 });
139
140 Ok(Daser {
141 cmd_tx,
142 cancellation_token,
143 join_handle,
144 })
145 }
146
147 #[cfg(test)]
148 pub(crate) fn mocked() -> (Self, crate::test_utils::MockDaserHandle) {
149 let (cmd_tx, cmd_rx) = mpsc::channel(16);
150 let cancellation_token = CancellationToken::new();
151
152 let join_handle = spawn(async {});
154
155 let daser = Daser {
156 cmd_tx,
157 cancellation_token,
158 join_handle,
159 };
160
161 let mock_handle = crate::test_utils::MockDaserHandle { cmd_rx };
162
163 (daser, mock_handle)
164 }
165
166 pub(crate) fn stop(&self) {
168 self.cancellation_token.cancel();
170 }
171
172 pub(crate) async fn join(&self) {
174 self.join_handle.join().await;
175 }
176
177 async fn send_command(&self, cmd: DaserCmd) -> Result<()> {
178 self.cmd_tx
179 .send(cmd)
180 .await
181 .map_err(|_| DaserError::WorkerDied)
182 }
183
184 pub(crate) async fn want_to_prune(&self, height: u64) -> Result<bool> {
185 let (tx, rx) = oneshot::channel();
186
187 self.send_command(DaserCmd::WantToPrune {
188 height,
189 respond_to: tx,
190 })
191 .await?;
192
193 Ok(rx.await?)
194 }
195
196 pub(crate) async fn update_highest_prunable_block(&self, value: u64) -> Result<()> {
197 self.send_command(DaserCmd::UpdateHighestPrunableHeight { value })
198 .await
199 }
200
201 pub(crate) async fn update_number_of_prunable_blocks(&self, value: u64) -> Result<()> {
202 self.send_command(DaserCmd::UpdateNumberOfPrunableBlocks { value })
203 .await
204 }
205}
206
207impl Drop for Daser {
208 fn drop(&mut self) {
209 self.stop();
210 }
211}
212
213struct Worker<S>
214where
215 S: Store + 'static,
216{
217 cmd_rx: mpsc::Receiver<DaserCmd>,
218 cancellation_token: CancellationToken,
219 event_pub: EventPublisher,
220 p2p: Arc<P2p>,
221 store: Arc<S>,
222 max_samples_needed: usize,
223 sampling_futs: FuturesUnordered<BoxFuture<'static, Result<(u64, bool)>>>,
224 queue: BlockRanges,
225 timed_out: BlockRanges,
226 ongoing: BlockRanges,
227 will_be_pruned: BlockRanges,
228 sampling_window: Duration,
229 concurrency_limit: usize,
230 additional_headersub_concurency: usize,
231 head_height: Option<u64>,
232 highest_prunable_height: Option<u64>,
233 num_of_prunable_blocks: u64,
234}
235
236impl<S> Worker<S>
237where
238 S: Store,
239{
240 fn new(
241 args: DaserArgs<S>,
242 cancellation_token: CancellationToken,
243 cmd_rx: mpsc::Receiver<DaserCmd>,
244 ) -> Result<Worker<S>> {
245 Ok(Worker {
246 cmd_rx,
247 cancellation_token,
248 event_pub: args.event_pub,
249 p2p: args.p2p,
250 store: args.store,
251 max_samples_needed: MAX_SAMPLES_NEEDED,
252 sampling_futs: FuturesUnordered::new(),
253 queue: BlockRanges::default(),
254 timed_out: BlockRanges::default(),
255 ongoing: BlockRanges::default(),
256 will_be_pruned: BlockRanges::default(),
257 sampling_window: args.sampling_window,
258 concurrency_limit: args.concurrency_limit,
259 additional_headersub_concurency: args.additional_headersub_concurrency,
260 head_height: None,
261 highest_prunable_height: None,
262 num_of_prunable_blocks: 0,
263 })
264 }
265
266 async fn run(&mut self) -> Result<()> {
267 loop {
268 if self.cancellation_token.is_cancelled() {
269 break;
270 }
271
272 self.connecting_event_loop().await;
273
274 if self.cancellation_token.is_cancelled() {
275 break;
276 }
277
278 self.connected_event_loop().await?;
279 }
280
281 debug!("Daser stopped");
282 Ok(())
283 }
284
285 async fn connecting_event_loop(&mut self) {
286 debug!("Entering connecting_event_loop");
287
288 let mut peer_tracker_info_watcher = self.p2p.peer_tracker_info_watcher();
289
290 if peer_tracker_info_watcher.borrow().num_connected_peers > 0 {
292 return;
293 }
294
295 loop {
296 select! {
297 _ = self.cancellation_token.cancelled() => {
298 break;
299 }
300 _ = peer_tracker_info_watcher.changed() => {
301 if peer_tracker_info_watcher.borrow().num_connected_peers > 0 {
302 break;
303 }
304 }
305 Some(cmd) = self.cmd_rx.recv() => self.on_cmd(cmd).await,
306 }
307 }
308 }
309
310 async fn connected_event_loop(&mut self) -> Result<()> {
311 debug!("Entering connected_event_loop");
312
313 let mut report_interval = Interval::new(Duration::from_secs(60));
314 let mut peer_tracker_info_watcher = self.p2p.peer_tracker_info_watcher();
315
316 if peer_tracker_info_watcher.borrow().num_connected_peers == 0 {
318 warn!("All peers disconnected");
319 return Ok(());
320 }
321
322 let store = self.store.clone();
327 let mut wait_new_head = store.wait_new_head();
328
329 self.update_queue().await?;
330
331 let mut first_report = true;
332
333 loop {
334 while self.schedule_next_sample_block().await? {}
336
337 if first_report {
338 self.report().await?;
339 first_report = false;
340 }
341
342 select! {
343 _ = self.cancellation_token.cancelled() => {
344 break;
345 }
346 _ = peer_tracker_info_watcher.changed() => {
347 if peer_tracker_info_watcher.borrow().num_connected_peers == 0 {
348 warn!("All peers disconnected");
349 break;
350 }
351 }
352 _ = report_interval.tick() => self.report().await?,
353 Some(cmd) = self.cmd_rx.recv() => self.on_cmd(cmd).await,
354 Some(res) = self.sampling_futs.next() => {
355 let (height, timed_out) = res?;
358
359 if timed_out {
360 self.timed_out.insert_relaxed(height..=height).expect("invalid height");
361 } else {
362 self.store.mark_as_sampled(height).await?;
363 }
364
365 self.ongoing.remove_relaxed(height..=height).expect("invalid height");
366 },
367 _ = &mut wait_new_head => {
368 wait_new_head = store.wait_new_head();
369 self.update_queue().await?;
370 }
371 }
372 }
373
374 self.sampling_futs.clear();
375 self.queue = BlockRanges::default();
376 self.ongoing = BlockRanges::default();
377 self.timed_out = BlockRanges::default();
378 self.head_height = None;
379
380 Ok(())
381 }
382
383 #[instrument(skip_all)]
384 async fn report(&mut self) -> Result<()> {
385 let sampled = self.store.get_sampled_ranges().await?;
386
387 info!(
388 "data sampling: stored and sampled blocks: {}, ongoing blocks: {}",
389 sampled, &self.ongoing,
390 );
391
392 Ok(())
393 }
394
395 async fn on_cmd(&mut self, cmd: DaserCmd) {
396 match cmd {
397 DaserCmd::WantToPrune { height, respond_to } => {
398 let res = self.on_want_to_prune(height).await;
399 respond_to.maybe_send(res);
400 }
401 DaserCmd::UpdateHighestPrunableHeight { value } => {
402 self.highest_prunable_height = Some(value);
403 }
404 DaserCmd::UpdateNumberOfPrunableBlocks { value } => {
405 self.num_of_prunable_blocks = value;
406 }
407 }
408 }
409
410 async fn on_want_to_prune(&mut self, height: u64) -> bool {
411 if self.ongoing.contains(height) {
413 return false;
414 }
415
416 self.queue
418 .remove_relaxed(height..=height)
419 .expect("invalid height");
420 self.will_be_pruned
422 .insert_relaxed(height..=height)
423 .expect("invalid height");
424
425 true
426 }
427
428 async fn schedule_next_sample_block(&mut self) -> Result<bool> {
429 let header = loop {
431 let Some(height) = self.queue.pop_head() else {
432 return Ok(false);
433 };
434
435 let concurrency_limit = if height <= self.highest_prunable_height.unwrap_or(0)
436 && self.num_of_prunable_blocks >= PRUNER_THRESHOLD
437 {
438 0
441 } else if height == self.head_height.unwrap_or(0) {
442 self.concurrency_limit + self.additional_headersub_concurency
444 } else {
445 self.concurrency_limit
446 };
447
448 if self.sampling_futs.len() >= concurrency_limit {
449 self.queue
452 .insert_relaxed(height..=height)
453 .expect("invalid height");
454 return Ok(false);
455 }
456
457 match self.store.get_by_height(height).await {
458 Ok(header) => break header,
459 Err(StoreError::NotFound) => {
460 self.update_queue().await?;
463 }
464 Err(e) => return Err(e.into()),
465 }
466 };
467
468 let height = header.height();
469 let square_width = header.square_width();
470
471 if !self.in_sampling_window(header.time()) {
473 self.queue
476 .remove_relaxed(1..=height)
477 .expect("invalid height");
478 self.timed_out
479 .insert_relaxed(1..=height)
480 .expect("invalid height");
481 return Ok(false);
482 }
483
484 let share_indexes = random_indexes(square_width, self.max_samples_needed);
486
487 let cids = share_indexes
490 .iter()
491 .map(|(row, col)| sample_cid(*row, *col, height))
492 .collect::<Result<Vec<_>, _>>()?;
493 self.store.update_sampling_metadata(height, cids).await?;
494
495 let p2p = self.p2p.clone();
496 let event_pub = self.event_pub.clone();
497 let sampling_window = self.sampling_window;
498
499 let fut = async move {
501 let now = Instant::now();
502
503 event_pub.send(NodeEvent::SamplingStarted {
504 height,
505 square_width,
506 shares: share_indexes.iter().copied().collect(),
507 });
508
509 let timeout = calc_timeout(header.time(), Time::now(), sampling_window);
511
512 let mut futs = share_indexes
514 .into_iter()
515 .map(|(row, col)| {
516 let p2p = p2p.clone();
517
518 async move {
519 let res = p2p.get_sample(row, col, height, Some(timeout)).await;
520 (row, col, res)
521 }
522 })
523 .collect::<FuturesUnordered<_>>();
524
525 let mut sampling_timed_out = false;
526
527 while let Some((row, column, res)) = futs.next().await {
529 let timed_out = match res {
530 Ok(_) => false,
531 Err(P2pError::RequestTimedOut) => true,
536 Err(e) => return Err(e.into()),
537 };
538
539 if timed_out {
540 sampling_timed_out = true;
541 }
542
543 event_pub.send(NodeEvent::ShareSamplingResult {
544 height,
545 square_width,
546 row,
547 column,
548 timed_out,
549 });
550 }
551
552 event_pub.send(NodeEvent::SamplingResult {
553 height,
554 timed_out: sampling_timed_out,
555 took: now.elapsed(),
556 });
557
558 Ok((height, sampling_timed_out))
559 }
560 .boxed();
561
562 self.sampling_futs.push(fut);
563 self.ongoing
564 .insert_relaxed(height..=height)
565 .expect("invalid height");
566
567 Ok(true)
568 }
569
570 async fn update_queue(&mut self) -> Result<()> {
572 let stored = self.store.get_stored_header_ranges().await?;
573 let sampled = self.store.get_sampled_ranges().await?;
574
575 self.head_height = stored.head();
576 self.queue = stored - &sampled - &self.timed_out - &self.ongoing - &self.will_be_pruned;
577
578 Ok(())
579 }
580
581 fn in_sampling_window(&self, time: Time) -> bool {
583 let now = Time::now();
584
585 if now < time {
587 return true;
588 }
589
590 let Ok(age) = now.duration_since(time) else {
591 return false;
592 };
593
594 age <= self.sampling_window
595 }
596}
597
598fn calc_timeout(header_time: Time, now: Time, sampling_window: Duration) -> Duration {
599 let sampling_window_end = now.saturating_sub(sampling_window);
600
601 header_time
602 .duration_since(sampling_window_end)
603 .unwrap_or(GET_SAMPLE_MIN_TIMEOUT)
604 .max(GET_SAMPLE_MIN_TIMEOUT)
605}
606
607fn random_indexes(square_width: u16, max_samples_needed: usize) -> HashSet<(u16, u16)> {
609 let samples_in_block = usize::from(square_width).pow(2);
610
611 if samples_in_block <= max_samples_needed {
614 return (0..square_width)
615 .flat_map(|row| (0..square_width).map(move |col| (row, col)))
616 .collect();
617 }
618
619 let mut indexes = HashSet::with_capacity(max_samples_needed);
620 let mut rng = rand::thread_rng();
621
622 while indexes.len() < max_samples_needed {
623 let row = rng.r#gen::<u16>() % square_width;
624 let col = rng.r#gen::<u16>() % square_width;
625 indexes.insert((row, col));
626 }
627
628 indexes
629}
630
631#[cfg(test)]
632mod tests {
633 use super::*;
634 use crate::events::{EventChannel, EventSubscriber};
635 use crate::node::SAMPLING_WINDOW;
636 use crate::p2p::P2pCmd;
637 use crate::p2p::shwap::convert_cid;
638 use crate::store::InMemoryStore;
639 use crate::test_utils::{ExtendedHeaderGeneratorExt, MockP2pHandle};
640 use crate::utils::OneshotResultSender;
641 use bytes::BytesMut;
642 use celestia_proto::bitswap::Block;
643 use celestia_types::sample::{Sample, SampleId};
644 use celestia_types::test_utils::{ExtendedHeaderGenerator, generate_dummy_eds};
645 use celestia_types::{AxisType, DataAvailabilityHeader, ExtendedDataSquare};
646 use cid::Cid;
647 use lumina_utils::test_utils::async_test;
648 use lumina_utils::time::sleep;
649 use prost::Message;
650 use std::collections::HashMap;
651 use std::time::Duration;
652
653 const REQ_TIMEOUT_SHARE_NUM: usize = 2;
657
658 #[async_test]
659 async fn check_calc_timeout() {
660 let now = Time::now();
661 let sampling_window = Duration::from_secs(60);
662
663 let header_time = now;
664 assert_eq!(
665 calc_timeout(header_time, now, sampling_window),
666 Duration::from_secs(60)
667 );
668
669 let header_time = now.checked_sub(Duration::from_secs(1)).unwrap();
670 assert_eq!(
671 calc_timeout(header_time, now, sampling_window),
672 Duration::from_secs(59)
673 );
674
675 let header_time = now.checked_sub(Duration::from_secs(49)).unwrap();
676 assert_eq!(
677 calc_timeout(header_time, now, sampling_window),
678 Duration::from_secs(11)
679 );
680
681 let header_time = now.checked_sub(Duration::from_secs(50)).unwrap();
682 assert_eq!(
683 calc_timeout(header_time, now, sampling_window),
684 Duration::from_secs(10)
685 );
686
687 let header_time = now.checked_sub(Duration::from_secs(51)).unwrap();
689 assert_eq!(
690 calc_timeout(header_time, now, sampling_window),
691 GET_SAMPLE_MIN_TIMEOUT
692 );
693
694 let header_time = now.checked_sub(Duration::from_secs(60)).unwrap();
696 assert_eq!(
697 calc_timeout(header_time, now, sampling_window),
698 GET_SAMPLE_MIN_TIMEOUT
699 );
700
701 let header_time = now.checked_sub(Duration::from_secs(61)).unwrap();
703 assert_eq!(
704 calc_timeout(header_time, now, sampling_window),
705 GET_SAMPLE_MIN_TIMEOUT
706 );
707
708 let header_time = now.checked_add(Duration::from_secs(1)).unwrap();
710 assert_eq!(
711 calc_timeout(header_time, now, sampling_window),
712 Duration::from_secs(61)
713 );
714 }
715
716 #[async_test]
717 async fn received_valid_samples() {
718 let (mock, mut handle) = P2p::mocked();
719 let store = Arc::new(InMemoryStore::new());
720 let events = EventChannel::new();
721 let mut event_sub = events.subscribe();
722
723 let _daser = Daser::start(DaserArgs {
724 event_pub: events.publisher(),
725 p2p: Arc::new(mock),
726 store: store.clone(),
727 sampling_window: SAMPLING_WINDOW,
728 concurrency_limit: 1,
729 additional_headersub_concurrency: DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY,
730 })
731 .unwrap();
732
733 let mut generator = ExtendedHeaderGenerator::new();
734
735 handle.expect_no_cmd().await;
736 handle.announce_peer_connected();
737 handle.expect_no_cmd().await;
738
739 gen_and_sample_block(
740 &mut handle,
741 &mut generator,
742 &store,
743 &mut event_sub,
744 2,
745 false,
746 )
747 .await;
748 gen_and_sample_block(
749 &mut handle,
750 &mut generator,
751 &store,
752 &mut event_sub,
753 4,
754 false,
755 )
756 .await;
757 gen_and_sample_block(
758 &mut handle,
759 &mut generator,
760 &store,
761 &mut event_sub,
762 8,
763 false,
764 )
765 .await;
766 gen_and_sample_block(
767 &mut handle,
768 &mut generator,
769 &store,
770 &mut event_sub,
771 16,
772 false,
773 )
774 .await;
775 }
776
777 #[async_test]
778 async fn sampling_timeout() {
779 let (mock, mut handle) = P2p::mocked();
780 let store = Arc::new(InMemoryStore::new());
781 let events = EventChannel::new();
782 let mut event_sub = events.subscribe();
783
784 let _daser = Daser::start(DaserArgs {
785 event_pub: events.publisher(),
786 p2p: Arc::new(mock),
787 store: store.clone(),
788 sampling_window: SAMPLING_WINDOW,
789 concurrency_limit: 1,
790 additional_headersub_concurrency: DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY,
791 })
792 .unwrap();
793
794 let mut generator = ExtendedHeaderGenerator::new();
795
796 handle.expect_no_cmd().await;
797 handle.announce_peer_connected();
798 handle.expect_no_cmd().await;
799
800 gen_and_sample_block(
801 &mut handle,
802 &mut generator,
803 &store,
804 &mut event_sub,
805 2,
806 false,
807 )
808 .await;
809 gen_and_sample_block(&mut handle, &mut generator, &store, &mut event_sub, 4, true).await;
810 gen_and_sample_block(
811 &mut handle,
812 &mut generator,
813 &store,
814 &mut event_sub,
815 8,
816 false,
817 )
818 .await;
819 }
820
821 #[async_test]
822 async fn backward_dasing() {
823 let (mock, mut handle) = P2p::mocked();
824 let store = Arc::new(InMemoryStore::new());
825 let events = EventChannel::new();
826
827 let _daser = Daser::start(DaserArgs {
828 event_pub: events.publisher(),
829 p2p: Arc::new(mock),
830 store: store.clone(),
831 sampling_window: SAMPLING_WINDOW,
832 concurrency_limit: 1,
833 additional_headersub_concurrency: DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY,
834 })
835 .unwrap();
836
837 let mut generator = ExtendedHeaderGenerator::new();
838
839 handle.expect_no_cmd().await;
840 handle.announce_peer_connected();
841 handle.expect_no_cmd().await;
842
843 let mut edses = Vec::new();
844 let mut headers = Vec::new();
845
846 for _ in 0..20 {
847 let eds = generate_dummy_eds(2);
848 let dah = DataAvailabilityHeader::from_eds(&eds);
849 let header = generator.next_with_dah(dah);
850
851 edses.push(eds);
852 headers.push(header);
853 }
854
855 store.insert(headers[4..=9].to_vec()).await.unwrap();
857
858 handle_get_shwap_cid(&mut handle, 10, &edses[9], false).await;
860
861 handle_get_shwap_cid(&mut handle, 9, &edses[8], false).await;
863
864 sleep(Duration::from_millis(10)).await;
866
867 store.insert(headers[15..=19].to_vec()).await.unwrap();
869
870 sleep(Duration::from_millis(10)).await;
872
873 handle_concurrent_get_shwap_cid(
875 &mut handle,
876 [(8, &edses[9], false), (20, &edses[19], false)],
877 )
878 .await;
879
880 handle_get_shwap_cid(&mut handle, 19, &edses[18], true).await;
882
883 handle.announce_all_peers_disconnected();
885
886 while let Some(cmd) = handle.try_recv_cmd().await {
889 match cmd {
890 P2pCmd::GetShwapCid { respond_to, .. } => {
891 let _ = respond_to.send(Err(P2pError::RequestTimedOut));
892 }
893 cmd => panic!("Unexpected command: {cmd:?}"),
894 }
895 }
896
897 handle.expect_no_cmd().await;
899
900 handle.announce_peer_connected();
902
903 handle_get_shwap_cid(&mut handle, 19, &edses[18], false).await;
905
906 for height in (16..=18).rev() {
908 let idx = height as usize - 1;
909 handle_get_shwap_cid(&mut handle, height, &edses[idx], false).await;
910 }
911
912 for height in (5..=7).rev() {
914 let idx = height as usize - 1;
915 handle_get_shwap_cid(&mut handle, height, &edses[idx], false).await;
916 }
917
918 handle.expect_no_cmd().await;
919
920 let eds = generate_dummy_eds(2);
922 let dah = DataAvailabilityHeader::from_eds(&eds);
923 let header = generator.next_with_dah(dah);
924 store.insert(header).await.unwrap();
925
926 handle_get_shwap_cid(&mut handle, 21, &eds, false).await;
928
929 handle.expect_no_cmd().await;
930 }
931
932 #[async_test]
933 async fn concurrency_limits() {
934 let (mock, mut handle) = P2p::mocked();
935 let store = Arc::new(InMemoryStore::new());
936 let events = EventChannel::new();
937
938 let concurrency_limit = 10;
940 let additional_headersub_concurrency = 5;
943 let shares_per_block = 4;
946
947 let mut generator = ExtendedHeaderGenerator::new();
948 store
949 .insert(generator.next_many_empty_verified(30))
950 .await
951 .unwrap();
952
953 let _daser = Daser::start(DaserArgs {
954 event_pub: events.publisher(),
955 p2p: Arc::new(mock),
956 store: store.clone(),
957 sampling_window: SAMPLING_WINDOW,
958 concurrency_limit,
959 additional_headersub_concurrency,
960 })
961 .unwrap();
962
963 handle.expect_no_cmd().await;
964 handle.announce_peer_connected();
965
966 let mut hold_respond_channels = Vec::new();
967
968 for _ in 0..(concurrency_limit * shares_per_block) {
969 let (cid, respond_to) = handle.expect_get_shwap_cid().await;
970 hold_respond_channels.push((cid, respond_to));
971 }
972
973 handle.expect_no_cmd().await;
975
976 store
978 .insert(generator.next_many_empty_verified(2))
979 .await
980 .unwrap();
981
982 for _ in 0..shares_per_block {
983 let (cid, respond_to) = handle.expect_get_shwap_cid().await;
984 hold_respond_channels.push((cid, respond_to));
985 }
986 handle.expect_no_cmd().await;
987
988 stop_sampling_for(&mut hold_respond_channels, 29);
992 stop_sampling_for(&mut hold_respond_channels, 30);
993
994 for _ in 0..shares_per_block {
996 let (cid, respond_to) = handle.expect_get_shwap_cid().await;
997 hold_respond_channels.push((cid, respond_to));
998 }
999
1000 handle.expect_no_cmd().await;
1002
1003 for _ in 0..additional_headersub_concurrency {
1005 store
1006 .insert(generator.next_many_empty_verified(1))
1007 .await
1008 .unwrap();
1009 sleep(Duration::from_millis(10)).await;
1011 }
1012
1013 for _ in 0..(additional_headersub_concurrency * shares_per_block) {
1014 let (cid, respond_to) = handle.expect_get_shwap_cid().await;
1015 hold_respond_channels.push((cid, respond_to));
1016 }
1017
1018 handle.expect_no_cmd().await;
1020 store
1021 .insert(generator.next_many_empty_verified(1))
1022 .await
1023 .unwrap();
1024 handle.expect_no_cmd().await;
1025
1026 stop_sampling_for(&mut hold_respond_channels, 28);
1029
1030 for _ in 0..shares_per_block {
1031 let (cid, respond_to) = handle.expect_get_shwap_cid().await;
1032 hold_respond_channels.push((cid, respond_to));
1033 }
1034
1035 handle.expect_no_cmd().await;
1037 store
1038 .insert(generator.next_many_empty_verified(1))
1039 .await
1040 .unwrap();
1041 handle.expect_no_cmd().await;
1042 }
1043
1044 #[async_test]
1045 async fn ratelimit() {
1046 let (mock, mut handle) = P2p::mocked();
1047 let store = Arc::new(InMemoryStore::new());
1048 let events = EventChannel::new();
1049 let mut event_sub = events.subscribe();
1050
1051 let daser = Daser::start(DaserArgs {
1052 event_pub: events.publisher(),
1053 p2p: Arc::new(mock),
1054 store: store.clone(),
1055 sampling_window: Duration::from_secs(60),
1056 concurrency_limit: 1,
1057 additional_headersub_concurrency: DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY,
1058 })
1059 .unwrap();
1060
1061 let mut generator = ExtendedHeaderGenerator::new();
1062
1063 let now = Time::now();
1064 let first_header_time = (now - Duration::from_secs(1024)).unwrap();
1065 generator.set_time(first_header_time, Duration::from_secs(1));
1066 store.insert(generator.next_many(990)).await.unwrap();
1067
1068 let mut edses = HashMap::new();
1069
1070 for height in 991..=1000 {
1071 let eds = generate_dummy_eds(2);
1072 let dah = DataAvailabilityHeader::from_eds(&eds);
1073 let header = generator.next_with_dah(dah);
1074
1075 edses.insert(height, eds);
1076 store.insert(header).await.unwrap();
1077 }
1078
1079 daser.update_highest_prunable_block(1000).await.unwrap();
1081 daser.update_number_of_prunable_blocks(1000).await.unwrap();
1082
1083 handle.expect_no_cmd().await;
1084 handle.announce_peer_connected();
1085
1086 handle.expect_no_cmd().await;
1088
1089 gen_and_sample_block(
1091 &mut handle,
1092 &mut generator,
1093 &store,
1094 &mut event_sub,
1095 2,
1096 false,
1097 )
1098 .await;
1099 gen_and_sample_block(
1100 &mut handle,
1101 &mut generator,
1102 &store,
1103 &mut event_sub,
1104 2,
1105 false,
1106 )
1107 .await;
1108
1109 handle.expect_no_cmd().await;
1111
1112 daser
1114 .update_number_of_prunable_blocks(PRUNER_THRESHOLD - 1)
1115 .await
1116 .unwrap();
1117 sleep(Duration::from_millis(5)).await;
1118
1119 sample_block(
1121 &mut handle,
1122 &store,
1123 &mut event_sub,
1124 edses.get(&1000).unwrap(),
1125 1000,
1126 false,
1127 )
1128 .await;
1129 sample_block(
1130 &mut handle,
1131 &store,
1132 &mut event_sub,
1133 edses.get(&999).unwrap(),
1134 999,
1135 false,
1136 )
1137 .await;
1138
1139 daser
1141 .update_number_of_prunable_blocks(PRUNER_THRESHOLD)
1142 .await
1143 .unwrap();
1144 sleep(Duration::from_millis(5)).await;
1145
1146 sample_block(
1148 &mut handle,
1149 &store,
1150 &mut event_sub,
1151 edses.get(&998).unwrap(),
1152 998,
1153 false,
1154 )
1155 .await;
1156
1157 handle.expect_no_cmd().await;
1159 }
1160
1161 fn stop_sampling_for(
1162 responders: &mut Vec<(Cid, OneshotResultSender<Vec<u8>, P2pError>)>,
1163 height: u64,
1164 ) {
1165 let mut indexes = Vec::new();
1166
1167 for (idx, (cid, _)) in responders.iter().enumerate() {
1168 let sample_id: SampleId = cid.try_into().unwrap();
1169 if sample_id.block_height() == height {
1170 indexes.push(idx)
1171 }
1172 }
1173
1174 for idx in indexes.into_iter().rev() {
1175 let (_cid, respond_to) = responders.remove(idx);
1176 respond_to.send(Err(P2pError::RequestTimedOut)).unwrap();
1177 }
1178 }
1179
1180 async fn gen_and_sample_block(
1181 handle: &mut MockP2pHandle,
1182 generator: &mut ExtendedHeaderGenerator,
1183 store: &InMemoryStore,
1184 event_sub: &mut EventSubscriber,
1185 square_width: usize,
1186 simulate_sampling_timeout: bool,
1187 ) {
1188 let eds = generate_dummy_eds(square_width);
1189 let dah = DataAvailabilityHeader::from_eds(&eds);
1190 let header = generator.next_with_dah(dah);
1191 let height = header.height();
1192
1193 store.insert(header).await.unwrap();
1194
1195 sample_block(
1196 handle,
1197 store,
1198 event_sub,
1199 &eds,
1200 height,
1201 simulate_sampling_timeout,
1202 )
1203 .await;
1204
1205 handle.expect_no_cmd().await;
1208 assert!(event_sub.try_recv().is_err());
1209 }
1210
1211 async fn sample_block(
1212 handle: &mut MockP2pHandle,
1213 store: &InMemoryStore,
1214 event_sub: &mut EventSubscriber,
1215 eds: &ExtendedDataSquare,
1216 height: u64,
1217 simulate_sampling_timeout: bool,
1218 ) {
1219 let cids = handle_get_shwap_cid(handle, height, eds, simulate_sampling_timeout).await;
1220
1221 sleep(Duration::from_millis(100)).await;
1223
1224 let sampled_ranges = store.get_sampled_ranges().await.unwrap();
1226 assert_eq!(sampled_ranges.contains(height), !simulate_sampling_timeout);
1227
1228 let mut sampling_metadata = store.get_sampling_metadata(height).await.unwrap().unwrap();
1230 sampling_metadata.cids.sort();
1231 assert_eq!(&sampling_metadata.cids, &cids);
1232
1233 let mut remaining_shares = match event_sub.try_recv().unwrap().event {
1235 NodeEvent::SamplingStarted {
1236 height: ev_height,
1237 square_width,
1238 shares,
1239 } => {
1240 assert_eq!(ev_height, height);
1241 assert_eq!(square_width, eds.square_width());
1242
1243 let mut cids = shares
1245 .iter()
1246 .map(|(row, col)| sample_cid(*row, *col, height).unwrap())
1247 .collect::<Vec<_>>();
1248 cids.sort();
1249 assert_eq!(&sampling_metadata.cids, &cids);
1250
1251 shares.into_iter().collect::<HashSet<_>>()
1252 }
1253 ev => panic!("Unexpected event: {ev}"),
1254 };
1255
1256 for i in 1..=remaining_shares.len() {
1258 match event_sub.try_recv().unwrap().event {
1259 NodeEvent::ShareSamplingResult {
1260 height: ev_height,
1261 square_width,
1262 row,
1263 column,
1264 timed_out,
1265 } => {
1266 assert_eq!(ev_height, height);
1267 assert_eq!(square_width, eds.square_width());
1268 assert_eq!(
1269 timed_out,
1270 simulate_sampling_timeout && i == REQ_TIMEOUT_SHARE_NUM
1271 );
1272 assert!(remaining_shares.remove(&(row, column)));
1274 }
1275 ev => panic!("Unexpected event: {ev}"),
1276 }
1277 }
1278
1279 assert!(remaining_shares.is_empty());
1280
1281 match event_sub.try_recv().unwrap().event {
1283 NodeEvent::SamplingResult {
1284 height: ev_height,
1285 timed_out,
1286 took,
1287 } => {
1288 assert_eq!(ev_height, height);
1289 assert_eq!(timed_out, simulate_sampling_timeout);
1290 assert_ne!(took, Duration::default());
1291 }
1292 ev => panic!("Unexpected event: {ev}"),
1293 }
1294 }
1295
1296 async fn handle_concurrent_get_shwap_cid<const N: usize>(
1298 handle: &mut MockP2pHandle,
1299 handling_args: [(u64, &ExtendedDataSquare, bool); N],
1300 ) -> Vec<Cid> {
1301 struct Info<'a> {
1302 eds: &'a ExtendedDataSquare,
1303 simulate_sampling_timeout: bool,
1304 needed_samples: usize,
1305 requests_count: usize,
1306 }
1307
1308 let mut infos = handling_args
1309 .into_iter()
1310 .map(|(height, eds, simulate_sampling_timeout)| {
1311 let square_width = eds.square_width() as usize;
1312 let needed_samples = (square_width * square_width).min(MAX_SAMPLES_NEEDED);
1313
1314 (
1315 height,
1316 Info {
1317 eds,
1318 simulate_sampling_timeout,
1319 needed_samples,
1320 requests_count: 0,
1321 },
1322 )
1323 })
1324 .collect::<HashMap<_, _>>();
1325
1326 let needed_samples_sum = infos.values().map(|info| info.needed_samples).sum();
1327 let mut cids = Vec::with_capacity(needed_samples_sum);
1328
1329 for _ in 0..needed_samples_sum {
1330 let (cid, respond_to) = handle.expect_get_shwap_cid().await;
1331 cids.push(cid);
1332
1333 let sample_id: SampleId = cid.try_into().unwrap();
1334 let info = infos
1335 .get_mut(&sample_id.block_height())
1336 .unwrap_or_else(|| panic!("Unexpected height: {}", sample_id.block_height()));
1337
1338 info.requests_count += 1;
1339
1340 if info.simulate_sampling_timeout && info.requests_count == REQ_TIMEOUT_SHARE_NUM {
1342 respond_to.send(Err(P2pError::RequestTimedOut)).unwrap();
1343 continue;
1344 }
1345
1346 let sample = gen_sample_of_cid(sample_id, info.eds).await;
1347 respond_to.send(Ok(sample)).unwrap();
1348 }
1349
1350 cids.sort();
1351 cids
1352 }
1353
1354 async fn handle_get_shwap_cid(
1356 handle: &mut MockP2pHandle,
1357 height: u64,
1358 eds: &ExtendedDataSquare,
1359 simulate_sampling_timeout: bool,
1360 ) -> Vec<Cid> {
1361 handle_concurrent_get_shwap_cid(handle, [(height, eds, simulate_sampling_timeout)]).await
1362 }
1363
1364 async fn gen_sample_of_cid(sample_id: SampleId, eds: &ExtendedDataSquare) -> Vec<u8> {
1365 let sample = Sample::new(
1366 sample_id.row_index(),
1367 sample_id.column_index(),
1368 AxisType::Row,
1369 eds,
1370 )
1371 .unwrap();
1372
1373 let mut container = BytesMut::new();
1374 sample.encode(&mut container);
1375
1376 let block = Block {
1377 cid: convert_cid(&sample_id.into()).unwrap().to_bytes(),
1378 container: container.to_vec(),
1379 };
1380
1381 block.encode_to_vec()
1382 }
1383}