1use std::collections::HashSet;
5use std::sync::Arc;
6use std::time::Duration;
7
8use futures::future::BoxFuture;
9use futures::stream::FuturesUnordered;
10use futures::{FutureExt, StreamExt};
11use lumina_utils::executor::{JoinHandle, spawn};
12use lumina_utils::time::{Instant, Interval};
13use rand::Rng;
14use tendermint::Time;
15use tokio::select;
16use tokio::sync::{mpsc, oneshot};
17use tokio_util::sync::CancellationToken;
18use tracing::{debug, error, info, instrument, warn};
19
20use crate::events::{EventPublisher, NodeEvent};
21use crate::p2p::shwap::sample_cid;
22use crate::p2p::{P2p, P2pError};
23use crate::store::{BlockRanges, Store, StoreError};
24use crate::utils::{OneshotSenderExt, TimeExt};
25
26const MAX_SAMPLES_NEEDED: usize = 16;
27const GET_SAMPLE_MIN_TIMEOUT: Duration = Duration::from_secs(10);
28const PRUNER_THRESHOLD: u64 = 512;
29
30pub(crate) const DEFAULT_CONCURENCY_LIMIT: usize = 3;
31pub(crate) const DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY: usize = 5;
32
33type Result<T, E = DaserError> = std::result::Result<T, E>;
34
35#[derive(Debug, thiserror::Error)]
37pub enum DaserError {
38 #[error("P2p: {0}")]
40 P2p(#[from] P2pError),
41
42 #[error("Store: {0}")]
44 Store(#[from] StoreError),
45
46 #[error("Worker died")]
48 WorkerDied,
49
50 #[error("Channel closed unexpectedly")]
52 ChannelClosedUnexpectedly,
53}
54
55impl From<oneshot::error::RecvError> for DaserError {
56 fn from(_value: oneshot::error::RecvError) -> Self {
57 DaserError::ChannelClosedUnexpectedly
58 }
59}
60
61pub(crate) struct Daser {
63 cmd_tx: mpsc::Sender<DaserCmd>,
64 cancellation_token: CancellationToken,
65 join_handle: JoinHandle,
66}
67
68pub(crate) struct DaserArgs<S>
70where
71 S: Store,
72{
73 pub(crate) p2p: Arc<P2p>,
75 pub(crate) store: Arc<S>,
77 pub(crate) event_pub: EventPublisher,
79 pub(crate) sampling_window: Duration,
81 pub(crate) concurrency_limit: usize,
83 pub(crate) additional_headersub_concurrency: usize,
85}
86
87#[derive(Debug)]
88pub(crate) enum DaserCmd {
89 UpdateHighestPrunableHeight {
90 value: u64,
91 },
92
93 UpdateNumberOfPrunableBlocks {
94 value: u64,
95 },
96
97 WantToPrune {
114 height: u64,
115 respond_to: oneshot::Sender<bool>,
116 },
117}
118
119impl Daser {
120 pub(crate) fn start<S>(args: DaserArgs<S>) -> Result<Self>
122 where
123 S: Store + 'static,
124 {
125 let cancellation_token = CancellationToken::new();
126 let event_pub = args.event_pub.clone();
127 let (cmd_tx, cmd_rx) = mpsc::channel(16);
128 let mut worker = Worker::new(args, cancellation_token.child_token(), cmd_rx)?;
129
130 let join_handle = spawn(async move {
131 if let Err(e) = worker.run().await {
132 error!("Daser stopped because of a fatal error: {e}");
133
134 event_pub.send(NodeEvent::FatalDaserError {
135 error: e.to_string(),
136 });
137 }
138 });
139
140 Ok(Daser {
141 cmd_tx,
142 cancellation_token,
143 join_handle,
144 })
145 }
146
147 #[cfg(test)]
148 pub(crate) fn mocked() -> (Self, crate::test_utils::MockDaserHandle) {
149 let (cmd_tx, cmd_rx) = mpsc::channel(16);
150 let cancellation_token = CancellationToken::new();
151
152 let join_handle = spawn(async {});
154
155 let daser = Daser {
156 cmd_tx,
157 cancellation_token,
158 join_handle,
159 };
160
161 let mock_handle = crate::test_utils::MockDaserHandle { cmd_rx };
162
163 (daser, mock_handle)
164 }
165
166 pub(crate) fn stop(&self) {
168 self.cancellation_token.cancel();
170 }
171
172 pub(crate) async fn join(&self) {
174 self.join_handle.join().await;
175 }
176
177 async fn send_command(&self, cmd: DaserCmd) -> Result<()> {
178 self.cmd_tx
179 .send(cmd)
180 .await
181 .map_err(|_| DaserError::WorkerDied)
182 }
183
184 pub(crate) async fn want_to_prune(&self, height: u64) -> Result<bool> {
185 let (tx, rx) = oneshot::channel();
186
187 self.send_command(DaserCmd::WantToPrune {
188 height,
189 respond_to: tx,
190 })
191 .await?;
192
193 Ok(rx.await?)
194 }
195
196 pub(crate) async fn update_highest_prunable_block(&self, value: u64) -> Result<()> {
197 self.send_command(DaserCmd::UpdateHighestPrunableHeight { value })
198 .await
199 }
200
201 pub(crate) async fn update_number_of_prunable_blocks(&self, value: u64) -> Result<()> {
202 self.send_command(DaserCmd::UpdateNumberOfPrunableBlocks { value })
203 .await
204 }
205}
206
207impl Drop for Daser {
208 fn drop(&mut self) {
209 self.stop();
210 }
211}
212
213struct Worker<S>
214where
215 S: Store + 'static,
216{
217 cmd_rx: mpsc::Receiver<DaserCmd>,
218 cancellation_token: CancellationToken,
219 event_pub: EventPublisher,
220 p2p: Arc<P2p>,
221 store: Arc<S>,
222 max_samples_needed: usize,
223 sampling_futs: FuturesUnordered<BoxFuture<'static, Result<(u64, bool)>>>,
224 queue: BlockRanges,
225 timed_out: BlockRanges,
226 ongoing: BlockRanges,
227 will_be_pruned: BlockRanges,
228 sampling_window: Duration,
229 concurrency_limit: usize,
230 additional_headersub_concurency: usize,
231 head_height: Option<u64>,
232 highest_prunable_height: Option<u64>,
233 num_of_prunable_blocks: u64,
234}
235
236impl<S> Worker<S>
237where
238 S: Store,
239{
240 fn new(
241 args: DaserArgs<S>,
242 cancellation_token: CancellationToken,
243 cmd_rx: mpsc::Receiver<DaserCmd>,
244 ) -> Result<Worker<S>> {
245 Ok(Worker {
246 cmd_rx,
247 cancellation_token,
248 event_pub: args.event_pub,
249 p2p: args.p2p,
250 store: args.store,
251 max_samples_needed: MAX_SAMPLES_NEEDED,
252 sampling_futs: FuturesUnordered::new(),
253 queue: BlockRanges::default(),
254 timed_out: BlockRanges::default(),
255 ongoing: BlockRanges::default(),
256 will_be_pruned: BlockRanges::default(),
257 sampling_window: args.sampling_window,
258 concurrency_limit: args.concurrency_limit,
259 additional_headersub_concurency: args.additional_headersub_concurrency,
260 head_height: None,
261 highest_prunable_height: None,
262 num_of_prunable_blocks: 0,
263 })
264 }
265
266 async fn run(&mut self) -> Result<()> {
267 loop {
268 if self.cancellation_token.is_cancelled() {
269 break;
270 }
271
272 self.connecting_event_loop().await;
273
274 if self.cancellation_token.is_cancelled() {
275 break;
276 }
277
278 self.connected_event_loop().await?;
279 }
280
281 debug!("Daser stopped");
282 Ok(())
283 }
284
285 async fn connecting_event_loop(&mut self) {
286 debug!("Entering connecting_event_loop");
287
288 let mut peer_tracker_info_watcher = self.p2p.peer_tracker_info_watcher();
289
290 if peer_tracker_info_watcher.borrow().num_connected_peers > 0 {
292 return;
293 }
294
295 loop {
296 select! {
297 _ = self.cancellation_token.cancelled() => {
298 break;
299 }
300 _ = peer_tracker_info_watcher.changed() => {
301 if peer_tracker_info_watcher.borrow().num_connected_peers > 0 {
302 break;
303 }
304 }
305 Some(cmd) = self.cmd_rx.recv() => self.on_cmd(cmd).await,
306 }
307 }
308 }
309
310 async fn connected_event_loop(&mut self) -> Result<()> {
311 debug!("Entering connected_event_loop");
312
313 let mut report_interval = Interval::new(Duration::from_secs(60));
314 let mut peer_tracker_info_watcher = self.p2p.peer_tracker_info_watcher();
315
316 if peer_tracker_info_watcher.borrow().num_connected_peers == 0 {
318 warn!("All peers disconnected");
319 return Ok(());
320 }
321
322 let store = self.store.clone();
327 let mut wait_new_head = store.wait_new_head();
328
329 self.update_queue().await?;
330
331 let mut first_report = true;
332
333 loop {
334 while self.schedule_next_sample_block().await? {}
336
337 if first_report {
338 self.report().await?;
339 first_report = false;
340 }
341
342 select! {
343 _ = self.cancellation_token.cancelled() => {
344 break;
345 }
346 _ = peer_tracker_info_watcher.changed() => {
347 if peer_tracker_info_watcher.borrow().num_connected_peers == 0 {
348 warn!("All peers disconnected");
349 break;
350 }
351 }
352 _ = report_interval.tick() => self.report().await?,
353 Some(cmd) = self.cmd_rx.recv() => self.on_cmd(cmd).await,
354 Some(res) = self.sampling_futs.next() => {
355 let (height, timed_out) = res?;
358
359 if timed_out {
360 self.timed_out.insert_relaxed(height..=height).expect("invalid height");
361 } else {
362 self.store.mark_as_sampled(height).await?;
363 }
364
365 self.ongoing.remove_relaxed(height..=height).expect("invalid height");
366 },
367 _ = &mut wait_new_head => {
368 wait_new_head = store.wait_new_head();
369 self.update_queue().await?;
370 }
371 }
372 }
373
374 self.sampling_futs.clear();
375 self.queue = BlockRanges::default();
376 self.ongoing = BlockRanges::default();
377 self.timed_out = BlockRanges::default();
378 self.head_height = None;
379
380 Ok(())
381 }
382
383 #[instrument(skip_all)]
384 async fn report(&mut self) -> Result<()> {
385 let sampled = self.store.get_sampled_ranges().await?;
386
387 info!(
388 "data sampling: stored and sampled blocks: {}, ongoing blocks: {}",
389 sampled, &self.ongoing,
390 );
391
392 Ok(())
393 }
394
395 async fn on_cmd(&mut self, cmd: DaserCmd) {
396 match cmd {
397 DaserCmd::WantToPrune { height, respond_to } => {
398 let res = self.on_want_to_prune(height).await;
399 respond_to.maybe_send(res);
400 }
401 DaserCmd::UpdateHighestPrunableHeight { value } => {
402 self.highest_prunable_height = Some(value);
403 }
404 DaserCmd::UpdateNumberOfPrunableBlocks { value } => {
405 self.num_of_prunable_blocks = value;
406 }
407 }
408 }
409
410 async fn on_want_to_prune(&mut self, height: u64) -> bool {
411 if self.ongoing.contains(height) {
413 return false;
414 }
415
416 self.queue
418 .remove_relaxed(height..=height)
419 .expect("invalid height");
420 self.will_be_pruned
422 .insert_relaxed(height..=height)
423 .expect("invalid height");
424
425 true
426 }
427
428 async fn schedule_next_sample_block(&mut self) -> Result<bool> {
429 let header = loop {
431 let Some(height) = self.queue.pop_head() else {
432 return Ok(false);
433 };
434
435 let concurrency_limit = if height <= self.highest_prunable_height.unwrap_or(0)
436 && self.num_of_prunable_blocks >= PRUNER_THRESHOLD
437 {
438 0
441 } else if height == self.head_height.unwrap_or(0) {
442 self.concurrency_limit + self.additional_headersub_concurency
444 } else {
445 self.concurrency_limit
446 };
447
448 if self.sampling_futs.len() >= concurrency_limit {
449 self.queue
452 .insert_relaxed(height..=height)
453 .expect("invalid height");
454 return Ok(false);
455 }
456
457 match self.store.get_by_height(height).await {
458 Ok(header) => break header,
459 Err(StoreError::NotFound) => {
460 self.update_queue().await?;
463 }
464 Err(e) => return Err(e.into()),
465 }
466 };
467
468 let height = header.height();
469 let square_width = header.square_width();
470
471 if !self.in_sampling_window(header.time()) {
473 self.queue
476 .remove_relaxed(1..=height)
477 .expect("invalid height");
478 self.timed_out
479 .insert_relaxed(1..=height)
480 .expect("invalid height");
481 return Ok(false);
482 }
483
484 let share_indexes = random_indexes(square_width, self.max_samples_needed);
486
487 let cids = share_indexes
490 .iter()
491 .map(|(row, col)| sample_cid(*row, *col, height))
492 .collect::<Result<Vec<_>, _>>()?;
493 self.store.update_sampling_metadata(height, cids).await?;
494
495 let p2p = self.p2p.clone();
496 let event_pub = self.event_pub.clone();
497 let sampling_window = self.sampling_window;
498
499 let fut = async move {
501 let now = Instant::now();
502
503 event_pub.send(NodeEvent::SamplingStarted {
504 height,
505 square_width,
506 shares: share_indexes.iter().copied().collect(),
507 });
508
509 let timeout = calc_timeout(header.time(), Time::now(), sampling_window);
511
512 let mut futs = share_indexes
514 .into_iter()
515 .map(|(row, col)| {
516 let p2p = p2p.clone();
517
518 async move {
519 let res = p2p.get_sample(row, col, height, Some(timeout)).await;
520 (row, col, res)
521 }
522 })
523 .collect::<FuturesUnordered<_>>();
524
525 let mut sampling_timed_out = false;
526
527 while let Some((row, column, res)) = futs.next().await {
529 let timed_out = match res {
530 Ok(_) => false,
531 Err(P2pError::RequestTimedOut) => true,
536 Err(e) => return Err(e.into()),
537 };
538
539 if timed_out {
540 sampling_timed_out = true;
541 }
542
543 event_pub.send(NodeEvent::ShareSamplingResult {
544 height,
545 square_width,
546 row,
547 column,
548 timed_out,
549 });
550 }
551
552 event_pub.send(NodeEvent::SamplingResult {
553 height,
554 timed_out: sampling_timed_out,
555 took: now.elapsed(),
556 });
557
558 Ok((height, sampling_timed_out))
559 }
560 .boxed();
561
562 self.sampling_futs.push(fut);
563 self.ongoing
564 .insert_relaxed(height..=height)
565 .expect("invalid height");
566
567 Ok(true)
568 }
569
570 async fn update_queue(&mut self) -> Result<()> {
572 let stored = self.store.get_stored_header_ranges().await?;
573 let sampled = self.store.get_sampled_ranges().await?;
574
575 self.head_height = stored.head();
576 self.queue = stored - &sampled - &self.timed_out - &self.ongoing - &self.will_be_pruned;
577
578 Ok(())
579 }
580
581 fn in_sampling_window(&self, time: Time) -> bool {
583 let now = Time::now();
584
585 if now < time {
587 return true;
588 }
589
590 let Ok(age) = now.duration_since(time) else {
591 return false;
592 };
593
594 age <= self.sampling_window
595 }
596}
597
598fn calc_timeout(header_time: Time, now: Time, sampling_window: Duration) -> Duration {
599 let sampling_window_end = now.saturating_sub(sampling_window);
600
601 header_time
602 .duration_since(sampling_window_end)
603 .unwrap_or(GET_SAMPLE_MIN_TIMEOUT)
604 .max(GET_SAMPLE_MIN_TIMEOUT)
605}
606
607fn random_indexes(square_width: u16, max_samples_needed: usize) -> HashSet<(u16, u16)> {
609 let samples_in_block = usize::from(square_width).pow(2);
610
611 if samples_in_block <= max_samples_needed {
614 return (0..square_width)
615 .flat_map(|row| (0..square_width).map(move |col| (row, col)))
616 .collect();
617 }
618
619 let mut indexes = HashSet::with_capacity(max_samples_needed);
620 let mut rng = rand::thread_rng();
621
622 while indexes.len() < max_samples_needed {
623 let row = rng.r#gen::<u16>() % square_width;
624 let col = rng.r#gen::<u16>() % square_width;
625 indexes.insert((row, col));
626 }
627
628 indexes
629}
630
631#[cfg(test)]
632mod tests {
633 use super::*;
634 use crate::events::{EventChannel, EventSubscriber};
635 use crate::node::SAMPLING_WINDOW;
636 use crate::p2p::P2pCmd;
637 use crate::p2p::shwap::convert_cid;
638 use crate::store::InMemoryStore;
639 use crate::test_utils::{ExtendedHeaderGeneratorExt, MockP2pHandle};
640 use crate::utils::OneshotResultSender;
641 use bytes::BytesMut;
642 use celestia_proto::bitswap::Block;
643 use celestia_types::consts::appconsts::AppVersion;
644 use celestia_types::sample::{Sample, SampleId};
645 use celestia_types::test_utils::{ExtendedHeaderGenerator, generate_dummy_eds};
646 use celestia_types::{AxisType, DataAvailabilityHeader, ExtendedDataSquare};
647 use cid::Cid;
648 use lumina_utils::test_utils::async_test;
649 use lumina_utils::time::sleep;
650 use prost::Message;
651 use std::collections::HashMap;
652 use std::time::Duration;
653
654 const REQ_TIMEOUT_SHARE_NUM: usize = 2;
658
659 #[async_test]
660 async fn check_calc_timeout() {
661 let now = Time::now();
662 let sampling_window = Duration::from_secs(60);
663
664 let header_time = now;
665 assert_eq!(
666 calc_timeout(header_time, now, sampling_window),
667 Duration::from_secs(60)
668 );
669
670 let header_time = now.checked_sub(Duration::from_secs(1)).unwrap();
671 assert_eq!(
672 calc_timeout(header_time, now, sampling_window),
673 Duration::from_secs(59)
674 );
675
676 let header_time = now.checked_sub(Duration::from_secs(49)).unwrap();
677 assert_eq!(
678 calc_timeout(header_time, now, sampling_window),
679 Duration::from_secs(11)
680 );
681
682 let header_time = now.checked_sub(Duration::from_secs(50)).unwrap();
683 assert_eq!(
684 calc_timeout(header_time, now, sampling_window),
685 Duration::from_secs(10)
686 );
687
688 let header_time = now.checked_sub(Duration::from_secs(51)).unwrap();
690 assert_eq!(
691 calc_timeout(header_time, now, sampling_window),
692 GET_SAMPLE_MIN_TIMEOUT
693 );
694
695 let header_time = now.checked_sub(Duration::from_secs(60)).unwrap();
697 assert_eq!(
698 calc_timeout(header_time, now, sampling_window),
699 GET_SAMPLE_MIN_TIMEOUT
700 );
701
702 let header_time = now.checked_sub(Duration::from_secs(61)).unwrap();
704 assert_eq!(
705 calc_timeout(header_time, now, sampling_window),
706 GET_SAMPLE_MIN_TIMEOUT
707 );
708
709 let header_time = now.checked_add(Duration::from_secs(1)).unwrap();
711 assert_eq!(
712 calc_timeout(header_time, now, sampling_window),
713 Duration::from_secs(61)
714 );
715 }
716
717 #[async_test]
718 async fn received_valid_samples() {
719 let (mock, mut handle) = P2p::mocked();
720 let store = Arc::new(InMemoryStore::new());
721 let events = EventChannel::new();
722 let mut event_sub = events.subscribe();
723
724 let _daser = Daser::start(DaserArgs {
725 event_pub: events.publisher(),
726 p2p: Arc::new(mock),
727 store: store.clone(),
728 sampling_window: SAMPLING_WINDOW,
729 concurrency_limit: 1,
730 additional_headersub_concurrency: DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY,
731 })
732 .unwrap();
733
734 let mut generator = ExtendedHeaderGenerator::new();
735
736 handle.expect_no_cmd().await;
737 handle.announce_peer_connected();
738 handle.expect_no_cmd().await;
739
740 gen_and_sample_block(
741 &mut handle,
742 &mut generator,
743 &store,
744 &mut event_sub,
745 2,
746 false,
747 )
748 .await;
749 gen_and_sample_block(
750 &mut handle,
751 &mut generator,
752 &store,
753 &mut event_sub,
754 4,
755 false,
756 )
757 .await;
758 gen_and_sample_block(
759 &mut handle,
760 &mut generator,
761 &store,
762 &mut event_sub,
763 8,
764 false,
765 )
766 .await;
767 gen_and_sample_block(
768 &mut handle,
769 &mut generator,
770 &store,
771 &mut event_sub,
772 16,
773 false,
774 )
775 .await;
776 }
777
778 #[async_test]
779 async fn sampling_timeout() {
780 let (mock, mut handle) = P2p::mocked();
781 let store = Arc::new(InMemoryStore::new());
782 let events = EventChannel::new();
783 let mut event_sub = events.subscribe();
784
785 let _daser = Daser::start(DaserArgs {
786 event_pub: events.publisher(),
787 p2p: Arc::new(mock),
788 store: store.clone(),
789 sampling_window: SAMPLING_WINDOW,
790 concurrency_limit: 1,
791 additional_headersub_concurrency: DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY,
792 })
793 .unwrap();
794
795 let mut generator = ExtendedHeaderGenerator::new();
796
797 handle.expect_no_cmd().await;
798 handle.announce_peer_connected();
799 handle.expect_no_cmd().await;
800
801 gen_and_sample_block(
802 &mut handle,
803 &mut generator,
804 &store,
805 &mut event_sub,
806 2,
807 false,
808 )
809 .await;
810 gen_and_sample_block(&mut handle, &mut generator, &store, &mut event_sub, 4, true).await;
811 gen_and_sample_block(
812 &mut handle,
813 &mut generator,
814 &store,
815 &mut event_sub,
816 8,
817 false,
818 )
819 .await;
820 }
821
822 #[async_test]
823 async fn backward_dasing() {
824 let (mock, mut handle) = P2p::mocked();
825 let store = Arc::new(InMemoryStore::new());
826 let events = EventChannel::new();
827
828 let _daser = Daser::start(DaserArgs {
829 event_pub: events.publisher(),
830 p2p: Arc::new(mock),
831 store: store.clone(),
832 sampling_window: SAMPLING_WINDOW,
833 concurrency_limit: 1,
834 additional_headersub_concurrency: DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY,
835 })
836 .unwrap();
837
838 let mut generator = ExtendedHeaderGenerator::new();
839
840 handle.expect_no_cmd().await;
841 handle.announce_peer_connected();
842 handle.expect_no_cmd().await;
843
844 let mut edses = Vec::new();
845 let mut headers = Vec::new();
846
847 for _ in 0..20 {
848 let eds = generate_dummy_eds(2, AppVersion::V2);
849 let dah = DataAvailabilityHeader::from_eds(&eds);
850 let header = generator.next_with_dah(dah);
851
852 edses.push(eds);
853 headers.push(header);
854 }
855
856 store.insert(headers[4..=9].to_vec()).await.unwrap();
858
859 handle_get_shwap_cid(&mut handle, 10, &edses[9], false).await;
861
862 handle_get_shwap_cid(&mut handle, 9, &edses[8], false).await;
864
865 sleep(Duration::from_millis(10)).await;
867
868 store.insert(headers[15..=19].to_vec()).await.unwrap();
870
871 sleep(Duration::from_millis(10)).await;
873
874 handle_concurrent_get_shwap_cid(
876 &mut handle,
877 [(8, &edses[9], false), (20, &edses[19], false)],
878 )
879 .await;
880
881 handle_get_shwap_cid(&mut handle, 19, &edses[18], true).await;
883
884 handle.announce_all_peers_disconnected();
886
887 while let Some(cmd) = handle.try_recv_cmd().await {
890 match cmd {
891 P2pCmd::GetShwapCid { respond_to, .. } => {
892 let _ = respond_to.send(Err(P2pError::RequestTimedOut));
893 }
894 cmd => panic!("Unexpected command: {cmd:?}"),
895 }
896 }
897
898 handle.expect_no_cmd().await;
900
901 handle.announce_peer_connected();
903
904 handle_get_shwap_cid(&mut handle, 19, &edses[18], false).await;
906
907 for height in (16..=18).rev() {
909 let idx = height as usize - 1;
910 handle_get_shwap_cid(&mut handle, height, &edses[idx], false).await;
911 }
912
913 for height in (5..=7).rev() {
915 let idx = height as usize - 1;
916 handle_get_shwap_cid(&mut handle, height, &edses[idx], false).await;
917 }
918
919 handle.expect_no_cmd().await;
920
921 let eds = generate_dummy_eds(2, AppVersion::V2);
923 let dah = DataAvailabilityHeader::from_eds(&eds);
924 let header = generator.next_with_dah(dah);
925 store.insert(header).await.unwrap();
926
927 handle_get_shwap_cid(&mut handle, 21, &eds, false).await;
929
930 handle.expect_no_cmd().await;
931 }
932
933 #[async_test]
934 async fn concurrency_limits() {
935 let (mock, mut handle) = P2p::mocked();
936 let store = Arc::new(InMemoryStore::new());
937 let events = EventChannel::new();
938
939 let concurrency_limit = 10;
941 let additional_headersub_concurrency = 5;
944 let shares_per_block = 4;
947
948 let mut generator = ExtendedHeaderGenerator::new();
949 store
950 .insert(generator.next_many_empty_verified(30))
951 .await
952 .unwrap();
953
954 let _daser = Daser::start(DaserArgs {
955 event_pub: events.publisher(),
956 p2p: Arc::new(mock),
957 store: store.clone(),
958 sampling_window: SAMPLING_WINDOW,
959 concurrency_limit,
960 additional_headersub_concurrency,
961 })
962 .unwrap();
963
964 handle.expect_no_cmd().await;
965 handle.announce_peer_connected();
966
967 let mut hold_respond_channels = Vec::new();
968
969 for _ in 0..(concurrency_limit * shares_per_block) {
970 let (cid, respond_to) = handle.expect_get_shwap_cid().await;
971 hold_respond_channels.push((cid, respond_to));
972 }
973
974 handle.expect_no_cmd().await;
976
977 store
979 .insert(generator.next_many_empty_verified(2))
980 .await
981 .unwrap();
982
983 for _ in 0..shares_per_block {
984 let (cid, respond_to) = handle.expect_get_shwap_cid().await;
985 hold_respond_channels.push((cid, respond_to));
986 }
987 handle.expect_no_cmd().await;
988
989 stop_sampling_for(&mut hold_respond_channels, 29);
993 stop_sampling_for(&mut hold_respond_channels, 30);
994
995 for _ in 0..shares_per_block {
997 let (cid, respond_to) = handle.expect_get_shwap_cid().await;
998 hold_respond_channels.push((cid, respond_to));
999 }
1000
1001 handle.expect_no_cmd().await;
1003
1004 for _ in 0..additional_headersub_concurrency {
1006 store
1007 .insert(generator.next_many_empty_verified(1))
1008 .await
1009 .unwrap();
1010 sleep(Duration::from_millis(10)).await;
1012 }
1013
1014 for _ in 0..(additional_headersub_concurrency * shares_per_block) {
1015 let (cid, respond_to) = handle.expect_get_shwap_cid().await;
1016 hold_respond_channels.push((cid, respond_to));
1017 }
1018
1019 handle.expect_no_cmd().await;
1021 store
1022 .insert(generator.next_many_empty_verified(1))
1023 .await
1024 .unwrap();
1025 handle.expect_no_cmd().await;
1026
1027 stop_sampling_for(&mut hold_respond_channels, 28);
1030
1031 for _ in 0..shares_per_block {
1032 let (cid, respond_to) = handle.expect_get_shwap_cid().await;
1033 hold_respond_channels.push((cid, respond_to));
1034 }
1035
1036 handle.expect_no_cmd().await;
1038 store
1039 .insert(generator.next_many_empty_verified(1))
1040 .await
1041 .unwrap();
1042 handle.expect_no_cmd().await;
1043 }
1044
1045 #[async_test]
1046 async fn ratelimit() {
1047 let (mock, mut handle) = P2p::mocked();
1048 let store = Arc::new(InMemoryStore::new());
1049 let events = EventChannel::new();
1050 let mut event_sub = events.subscribe();
1051
1052 let daser = Daser::start(DaserArgs {
1053 event_pub: events.publisher(),
1054 p2p: Arc::new(mock),
1055 store: store.clone(),
1056 sampling_window: Duration::from_secs(60),
1057 concurrency_limit: 1,
1058 additional_headersub_concurrency: DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY,
1059 })
1060 .unwrap();
1061
1062 let mut generator = ExtendedHeaderGenerator::new();
1063
1064 let now = Time::now();
1065 let first_header_time = (now - Duration::from_secs(1024)).unwrap();
1066 generator.set_time(first_header_time, Duration::from_secs(1));
1067 store.insert(generator.next_many(990)).await.unwrap();
1068
1069 let mut edses = HashMap::new();
1070
1071 for height in 991..=1000 {
1072 let eds = generate_dummy_eds(2, AppVersion::V2);
1073 let dah = DataAvailabilityHeader::from_eds(&eds);
1074 let header = generator.next_with_dah(dah);
1075
1076 edses.insert(height, eds);
1077 store.insert(header).await.unwrap();
1078 }
1079
1080 daser.update_highest_prunable_block(1000).await.unwrap();
1082 daser.update_number_of_prunable_blocks(1000).await.unwrap();
1083
1084 handle.expect_no_cmd().await;
1085 handle.announce_peer_connected();
1086
1087 handle.expect_no_cmd().await;
1089
1090 gen_and_sample_block(
1092 &mut handle,
1093 &mut generator,
1094 &store,
1095 &mut event_sub,
1096 2,
1097 false,
1098 )
1099 .await;
1100 gen_and_sample_block(
1101 &mut handle,
1102 &mut generator,
1103 &store,
1104 &mut event_sub,
1105 2,
1106 false,
1107 )
1108 .await;
1109
1110 handle.expect_no_cmd().await;
1112
1113 daser
1115 .update_number_of_prunable_blocks(PRUNER_THRESHOLD - 1)
1116 .await
1117 .unwrap();
1118 sleep(Duration::from_millis(5)).await;
1119
1120 sample_block(
1122 &mut handle,
1123 &store,
1124 &mut event_sub,
1125 edses.get(&1000).unwrap(),
1126 1000,
1127 false,
1128 )
1129 .await;
1130 sample_block(
1131 &mut handle,
1132 &store,
1133 &mut event_sub,
1134 edses.get(&999).unwrap(),
1135 999,
1136 false,
1137 )
1138 .await;
1139
1140 daser
1142 .update_number_of_prunable_blocks(PRUNER_THRESHOLD)
1143 .await
1144 .unwrap();
1145 sleep(Duration::from_millis(5)).await;
1146
1147 sample_block(
1149 &mut handle,
1150 &store,
1151 &mut event_sub,
1152 edses.get(&998).unwrap(),
1153 998,
1154 false,
1155 )
1156 .await;
1157
1158 handle.expect_no_cmd().await;
1160 }
1161
1162 fn stop_sampling_for(
1163 responders: &mut Vec<(Cid, OneshotResultSender<Vec<u8>, P2pError>)>,
1164 height: u64,
1165 ) {
1166 let mut indexes = Vec::new();
1167
1168 for (idx, (cid, _)) in responders.iter().enumerate() {
1169 let sample_id: SampleId = cid.try_into().unwrap();
1170 if sample_id.block_height() == height {
1171 indexes.push(idx)
1172 }
1173 }
1174
1175 for idx in indexes.into_iter().rev() {
1176 let (_cid, respond_to) = responders.remove(idx);
1177 respond_to.send(Err(P2pError::RequestTimedOut)).unwrap();
1178 }
1179 }
1180
1181 async fn gen_and_sample_block(
1182 handle: &mut MockP2pHandle,
1183 generator: &mut ExtendedHeaderGenerator,
1184 store: &InMemoryStore,
1185 event_sub: &mut EventSubscriber,
1186 square_width: usize,
1187 simulate_sampling_timeout: bool,
1188 ) {
1189 let eds = generate_dummy_eds(square_width, AppVersion::V2);
1190 let dah = DataAvailabilityHeader::from_eds(&eds);
1191 let header = generator.next_with_dah(dah);
1192 let height = header.height();
1193
1194 store.insert(header).await.unwrap();
1195
1196 sample_block(
1197 handle,
1198 store,
1199 event_sub,
1200 &eds,
1201 height,
1202 simulate_sampling_timeout,
1203 )
1204 .await;
1205
1206 handle.expect_no_cmd().await;
1209 assert!(event_sub.try_recv().is_err());
1210 }
1211
1212 async fn sample_block(
1213 handle: &mut MockP2pHandle,
1214 store: &InMemoryStore,
1215 event_sub: &mut EventSubscriber,
1216 eds: &ExtendedDataSquare,
1217 height: u64,
1218 simulate_sampling_timeout: bool,
1219 ) {
1220 let cids = handle_get_shwap_cid(handle, height, eds, simulate_sampling_timeout).await;
1221
1222 sleep(Duration::from_millis(100)).await;
1224
1225 let sampled_ranges = store.get_sampled_ranges().await.unwrap();
1227 assert_eq!(sampled_ranges.contains(height), !simulate_sampling_timeout);
1228
1229 let mut sampling_metadata = store.get_sampling_metadata(height).await.unwrap().unwrap();
1231 sampling_metadata.cids.sort();
1232 assert_eq!(&sampling_metadata.cids, &cids);
1233
1234 let mut remaining_shares = match event_sub.try_recv().unwrap().event {
1236 NodeEvent::SamplingStarted {
1237 height: ev_height,
1238 square_width,
1239 shares,
1240 } => {
1241 assert_eq!(ev_height, height);
1242 assert_eq!(square_width, eds.square_width());
1243
1244 let mut cids = shares
1246 .iter()
1247 .map(|(row, col)| sample_cid(*row, *col, height).unwrap())
1248 .collect::<Vec<_>>();
1249 cids.sort();
1250 assert_eq!(&sampling_metadata.cids, &cids);
1251
1252 shares.into_iter().collect::<HashSet<_>>()
1253 }
1254 ev => panic!("Unexpected event: {ev}"),
1255 };
1256
1257 for i in 1..=remaining_shares.len() {
1259 match event_sub.try_recv().unwrap().event {
1260 NodeEvent::ShareSamplingResult {
1261 height: ev_height,
1262 square_width,
1263 row,
1264 column,
1265 timed_out,
1266 } => {
1267 assert_eq!(ev_height, height);
1268 assert_eq!(square_width, eds.square_width());
1269 assert_eq!(
1270 timed_out,
1271 simulate_sampling_timeout && i == REQ_TIMEOUT_SHARE_NUM
1272 );
1273 assert!(remaining_shares.remove(&(row, column)));
1275 }
1276 ev => panic!("Unexpected event: {ev}"),
1277 }
1278 }
1279
1280 assert!(remaining_shares.is_empty());
1281
1282 match event_sub.try_recv().unwrap().event {
1284 NodeEvent::SamplingResult {
1285 height: ev_height,
1286 timed_out,
1287 took,
1288 } => {
1289 assert_eq!(ev_height, height);
1290 assert_eq!(timed_out, simulate_sampling_timeout);
1291 assert_ne!(took, Duration::default());
1292 }
1293 ev => panic!("Unexpected event: {ev}"),
1294 }
1295 }
1296
1297 async fn handle_concurrent_get_shwap_cid<const N: usize>(
1299 handle: &mut MockP2pHandle,
1300 handling_args: [(u64, &ExtendedDataSquare, bool); N],
1301 ) -> Vec<Cid> {
1302 struct Info<'a> {
1303 eds: &'a ExtendedDataSquare,
1304 simulate_sampling_timeout: bool,
1305 needed_samples: usize,
1306 requests_count: usize,
1307 }
1308
1309 let mut infos = handling_args
1310 .into_iter()
1311 .map(|(height, eds, simulate_sampling_timeout)| {
1312 let square_width = eds.square_width() as usize;
1313 let needed_samples = (square_width * square_width).min(MAX_SAMPLES_NEEDED);
1314
1315 (
1316 height,
1317 Info {
1318 eds,
1319 simulate_sampling_timeout,
1320 needed_samples,
1321 requests_count: 0,
1322 },
1323 )
1324 })
1325 .collect::<HashMap<_, _>>();
1326
1327 let needed_samples_sum = infos.values().map(|info| info.needed_samples).sum();
1328 let mut cids = Vec::with_capacity(needed_samples_sum);
1329
1330 for _ in 0..needed_samples_sum {
1331 let (cid, respond_to) = handle.expect_get_shwap_cid().await;
1332 cids.push(cid);
1333
1334 let sample_id: SampleId = cid.try_into().unwrap();
1335 let info = infos
1336 .get_mut(&sample_id.block_height())
1337 .unwrap_or_else(|| panic!("Unexpected height: {}", sample_id.block_height()));
1338
1339 info.requests_count += 1;
1340
1341 if info.simulate_sampling_timeout && info.requests_count == REQ_TIMEOUT_SHARE_NUM {
1343 respond_to.send(Err(P2pError::RequestTimedOut)).unwrap();
1344 continue;
1345 }
1346
1347 let sample = gen_sample_of_cid(sample_id, info.eds).await;
1348 respond_to.send(Ok(sample)).unwrap();
1349 }
1350
1351 cids.sort();
1352 cids
1353 }
1354
1355 async fn handle_get_shwap_cid(
1357 handle: &mut MockP2pHandle,
1358 height: u64,
1359 eds: &ExtendedDataSquare,
1360 simulate_sampling_timeout: bool,
1361 ) -> Vec<Cid> {
1362 handle_concurrent_get_shwap_cid(handle, [(height, eds, simulate_sampling_timeout)]).await
1363 }
1364
1365 async fn gen_sample_of_cid(sample_id: SampleId, eds: &ExtendedDataSquare) -> Vec<u8> {
1366 let sample = Sample::new(
1367 sample_id.row_index(),
1368 sample_id.column_index(),
1369 AxisType::Row,
1370 eds,
1371 )
1372 .unwrap();
1373
1374 let mut container = BytesMut::new();
1375 sample.encode(&mut container);
1376
1377 let block = Block {
1378 cid: convert_cid(&sample_id.into()).unwrap().to_bytes(),
1379 container: container.to_vec(),
1380 };
1381
1382 block.encode_to_vec()
1383 }
1384}