1use std::collections::HashSet;
5use std::sync::Arc;
6use std::time::Duration;
7
8use futures::future::BoxFuture;
9use futures::stream::FuturesUnordered;
10use futures::{FutureExt, StreamExt};
11use lumina_utils::executor::{JoinHandle, spawn};
12use lumina_utils::time::{Instant, Interval};
13use rand::Rng;
14use tendermint::Time;
15use tokio::select;
16use tokio::sync::{mpsc, oneshot};
17use tokio_util::sync::CancellationToken;
18use tracing::{debug, error, info, instrument, warn};
19
20use crate::events::{EventPublisher, NodeEvent};
21use crate::p2p::shwap::sample_cid;
22use crate::p2p::{P2p, P2pError};
23use crate::store::{BlockRanges, Store, StoreError};
24use crate::utils::{OneshotSenderExt, TimeExt};
25
26const MAX_SAMPLES_NEEDED: usize = 16;
27const GET_SAMPLE_MIN_TIMEOUT: Duration = Duration::from_secs(10);
28const PRUNER_THRESHOLD: u64 = 512;
29
30pub(crate) const DEFAULT_CONCURENCY_LIMIT: usize = 3;
31pub(crate) const DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY: usize = 5;
32
33type Result<T, E = DaserError> = std::result::Result<T, E>;
34
35#[derive(Debug, thiserror::Error)]
37pub enum DaserError {
38 #[error("P2p: {0}")]
40 P2p(#[from] P2pError),
41
42 #[error("Store: {0}")]
44 Store(#[from] StoreError),
45
46 #[error("Worker died")]
48 WorkerDied,
49
50 #[error("Channel closed unexpectedly")]
52 ChannelClosedUnexpectedly,
53}
54
55impl From<oneshot::error::RecvError> for DaserError {
56 fn from(_value: oneshot::error::RecvError) -> Self {
57 DaserError::ChannelClosedUnexpectedly
58 }
59}
60
61pub(crate) struct Daser {
63 cmd_tx: mpsc::Sender<DaserCmd>,
64 cancellation_token: CancellationToken,
65 join_handle: JoinHandle,
66}
67
68pub(crate) struct DaserArgs<S>
70where
71 S: Store,
72{
73 pub(crate) p2p: Arc<P2p>,
75 pub(crate) store: Arc<S>,
77 pub(crate) event_pub: EventPublisher,
79 pub(crate) sampling_window: Duration,
81 pub(crate) concurrency_limit: usize,
83 pub(crate) additional_headersub_concurrency: usize,
85}
86
87#[derive(Debug)]
88pub(crate) enum DaserCmd {
89 UpdateHighestPrunableHeight {
90 value: u64,
91 },
92
93 UpdateNumberOfPrunableBlocks {
94 value: u64,
95 },
96
97 WantToPrune {
114 height: u64,
115 respond_to: oneshot::Sender<bool>,
116 },
117}
118
119impl Daser {
120 pub(crate) fn start<S>(args: DaserArgs<S>) -> Result<Self>
122 where
123 S: Store + 'static,
124 {
125 let cancellation_token = CancellationToken::new();
126 let event_pub = args.event_pub.clone();
127 let (cmd_tx, cmd_rx) = mpsc::channel(16);
128 let mut worker = Worker::new(args, cancellation_token.child_token(), cmd_rx)?;
129
130 let join_handle = spawn(async move {
131 if let Err(e) = worker.run().await {
132 error!("Daser stopped because of a fatal error: {e}");
133
134 event_pub.send(NodeEvent::FatalDaserError {
135 error: e.to_string(),
136 });
137 }
138 });
139
140 Ok(Daser {
141 cmd_tx,
142 cancellation_token,
143 join_handle,
144 })
145 }
146
147 #[cfg(test)]
148 pub(crate) fn mocked() -> (Self, crate::test_utils::MockDaserHandle) {
149 let (cmd_tx, cmd_rx) = mpsc::channel(16);
150 let cancellation_token = CancellationToken::new();
151
152 let join_handle = spawn(async {});
154
155 let daser = Daser {
156 cmd_tx,
157 cancellation_token,
158 join_handle,
159 };
160
161 let mock_handle = crate::test_utils::MockDaserHandle { cmd_rx };
162
163 (daser, mock_handle)
164 }
165
166 pub(crate) fn stop(&self) {
168 self.cancellation_token.cancel();
170 }
171
172 pub(crate) async fn join(&self) {
174 self.join_handle.join().await;
175 }
176
177 async fn send_command(&self, cmd: DaserCmd) -> Result<()> {
178 self.cmd_tx
179 .send(cmd)
180 .await
181 .map_err(|_| DaserError::WorkerDied)
182 }
183
184 pub(crate) async fn want_to_prune(&self, height: u64) -> Result<bool> {
185 let (tx, rx) = oneshot::channel();
186
187 self.send_command(DaserCmd::WantToPrune {
188 height,
189 respond_to: tx,
190 })
191 .await?;
192
193 Ok(rx.await?)
194 }
195
196 pub(crate) async fn update_highest_prunable_block(&self, value: u64) -> Result<()> {
197 self.send_command(DaserCmd::UpdateHighestPrunableHeight { value })
198 .await
199 }
200
201 pub(crate) async fn update_number_of_prunable_blocks(&self, value: u64) -> Result<()> {
202 self.send_command(DaserCmd::UpdateNumberOfPrunableBlocks { value })
203 .await
204 }
205}
206
207impl Drop for Daser {
208 fn drop(&mut self) {
209 self.stop();
210 }
211}
212
213struct Worker<S>
214where
215 S: Store + 'static,
216{
217 cmd_rx: mpsc::Receiver<DaserCmd>,
218 cancellation_token: CancellationToken,
219 event_pub: EventPublisher,
220 p2p: Arc<P2p>,
221 store: Arc<S>,
222 max_samples_needed: usize,
223 sampling_futs: FuturesUnordered<BoxFuture<'static, Result<(u64, bool)>>>,
224 queue: BlockRanges,
225 timed_out: BlockRanges,
226 ongoing: BlockRanges,
227 will_be_pruned: BlockRanges,
228 sampling_window: Duration,
229 concurrency_limit: usize,
230 additional_headersub_concurency: usize,
231 head_height: Option<u64>,
232 highest_prunable_height: Option<u64>,
233 num_of_prunable_blocks: u64,
234}
235
236impl<S> Worker<S>
237where
238 S: Store,
239{
240 fn new(
241 args: DaserArgs<S>,
242 cancellation_token: CancellationToken,
243 cmd_rx: mpsc::Receiver<DaserCmd>,
244 ) -> Result<Worker<S>> {
245 Ok(Worker {
246 cmd_rx,
247 cancellation_token,
248 event_pub: args.event_pub,
249 p2p: args.p2p,
250 store: args.store,
251 max_samples_needed: MAX_SAMPLES_NEEDED,
252 sampling_futs: FuturesUnordered::new(),
253 queue: BlockRanges::default(),
254 timed_out: BlockRanges::default(),
255 ongoing: BlockRanges::default(),
256 will_be_pruned: BlockRanges::default(),
257 sampling_window: args.sampling_window,
258 concurrency_limit: args.concurrency_limit,
259 additional_headersub_concurency: args.additional_headersub_concurrency,
260 head_height: None,
261 highest_prunable_height: None,
262 num_of_prunable_blocks: 0,
263 })
264 }
265
266 async fn run(&mut self) -> Result<()> {
267 loop {
268 if self.cancellation_token.is_cancelled() {
269 break;
270 }
271
272 self.connecting_event_loop().await;
273
274 if self.cancellation_token.is_cancelled() {
275 break;
276 }
277
278 self.connected_event_loop().await?;
279 }
280
281 debug!("Daser stopped");
282 Ok(())
283 }
284
285 async fn connecting_event_loop(&mut self) {
286 debug!("Entering connecting_event_loop");
287
288 let mut peer_tracker_info_watcher = self.p2p.peer_tracker_info_watcher();
289
290 if peer_tracker_info_watcher.borrow().num_connected_peers > 0 {
292 return;
293 }
294
295 loop {
296 select! {
297 _ = self.cancellation_token.cancelled() => {
298 break;
299 }
300 _ = peer_tracker_info_watcher.changed() => {
301 if peer_tracker_info_watcher.borrow().num_connected_peers > 0 {
302 break;
303 }
304 }
305 Some(cmd) = self.cmd_rx.recv() => self.on_cmd(cmd).await,
306 }
307 }
308 }
309
310 async fn connected_event_loop(&mut self) -> Result<()> {
311 debug!("Entering connected_event_loop");
312
313 let mut report_interval = Interval::new(Duration::from_secs(60));
314 let mut peer_tracker_info_watcher = self.p2p.peer_tracker_info_watcher();
315
316 if peer_tracker_info_watcher.borrow().num_connected_peers == 0 {
318 warn!("All peers disconnected");
319 return Ok(());
320 }
321
322 let store = self.store.clone();
327 let mut wait_new_head = store.wait_new_head();
328
329 self.update_queue().await?;
330
331 let mut first_report = true;
332
333 loop {
334 while self.schedule_next_sample_block().await? {}
336
337 if first_report {
338 self.report().await?;
339 first_report = false;
340 }
341
342 select! {
343 _ = self.cancellation_token.cancelled() => {
344 break;
345 }
346 _ = peer_tracker_info_watcher.changed() => {
347 if peer_tracker_info_watcher.borrow().num_connected_peers == 0 {
348 warn!("All peers disconnected");
349 break;
350 }
351 }
352 _ = report_interval.tick() => self.report().await?,
353 Some(cmd) = self.cmd_rx.recv() => self.on_cmd(cmd).await,
354 Some(res) = self.sampling_futs.next() => {
355 let (height, timed_out) = res?;
358
359 if timed_out {
360 self.timed_out.insert_relaxed(height..=height).expect("invalid height");
361 } else {
362 self.store.mark_as_sampled(height).await?;
363 }
364
365 self.ongoing.remove_relaxed(height..=height).expect("invalid height");
366 },
367 _ = &mut wait_new_head => {
368 wait_new_head = store.wait_new_head();
369 self.update_queue().await?;
370 }
371 }
372 }
373
374 self.sampling_futs.clear();
375 self.queue = BlockRanges::default();
376 self.ongoing = BlockRanges::default();
377 self.timed_out = BlockRanges::default();
378 self.head_height = None;
379
380 Ok(())
381 }
382
383 #[instrument(skip_all)]
384 async fn report(&mut self) -> Result<()> {
385 let sampled = self.store.get_sampled_ranges().await?;
386
387 info!(
388 "data sampling: stored and sampled blocks: {}, ongoing blocks: {}",
389 sampled, &self.ongoing,
390 );
391
392 Ok(())
393 }
394
395 async fn on_cmd(&mut self, cmd: DaserCmd) {
396 match cmd {
397 DaserCmd::WantToPrune { height, respond_to } => {
398 let res = self.on_want_to_prune(height).await;
399 respond_to.maybe_send(res);
400 }
401 DaserCmd::UpdateHighestPrunableHeight { value } => {
402 self.highest_prunable_height = Some(value);
403 }
404 DaserCmd::UpdateNumberOfPrunableBlocks { value } => {
405 self.num_of_prunable_blocks = value;
406 }
407 }
408 }
409
410 async fn on_want_to_prune(&mut self, height: u64) -> bool {
411 if self.ongoing.contains(height) {
413 return false;
414 }
415
416 self.queue
418 .remove_relaxed(height..=height)
419 .expect("invalid height");
420 self.will_be_pruned
422 .insert_relaxed(height..=height)
423 .expect("invalid height");
424
425 true
426 }
427
428 async fn schedule_next_sample_block(&mut self) -> Result<bool> {
429 let header = loop {
431 let Some(height) = self.queue.pop_head() else {
432 return Ok(false);
433 };
434
435 let concurrency_limit = if height <= self.highest_prunable_height.unwrap_or(0)
436 && self.num_of_prunable_blocks >= PRUNER_THRESHOLD
437 {
438 0
441 } else if height == self.head_height.unwrap_or(0) {
442 self.concurrency_limit + self.additional_headersub_concurency
444 } else {
445 self.concurrency_limit
446 };
447
448 if self.sampling_futs.len() >= concurrency_limit {
449 self.queue
452 .insert_relaxed(height..=height)
453 .expect("invalid height");
454 return Ok(false);
455 }
456
457 match self.store.get_by_height(height).await {
458 Ok(header) => break header,
459 Err(StoreError::NotFound) => {
460 self.update_queue().await?;
463 }
464 Err(e) => return Err(e.into()),
465 }
466 };
467
468 let height = header.height().value();
469 let square_width = header.dah.square_width();
470
471 if !self.in_sampling_window(header.time()) {
473 self.queue
476 .remove_relaxed(1..=height)
477 .expect("invalid height");
478 self.timed_out
479 .insert_relaxed(1..=height)
480 .expect("invalid height");
481 return Ok(false);
482 }
483
484 let share_indexes = random_indexes(square_width, self.max_samples_needed);
486
487 let cids = share_indexes
490 .iter()
491 .map(|(row, col)| sample_cid(*row, *col, height))
492 .collect::<Result<Vec<_>, _>>()?;
493 self.store.update_sampling_metadata(height, cids).await?;
494
495 let p2p = self.p2p.clone();
496 let event_pub = self.event_pub.clone();
497 let sampling_window = self.sampling_window;
498
499 let fut = async move {
501 let now = Instant::now();
502
503 event_pub.send(NodeEvent::SamplingStarted {
504 height,
505 square_width,
506 shares: share_indexes.iter().copied().collect(),
507 });
508
509 let timeout = calc_timeout(header.time(), Time::now(), sampling_window);
511
512 let mut futs = share_indexes
514 .into_iter()
515 .map(|(row, col)| {
516 let p2p = p2p.clone();
517
518 async move {
519 let res = p2p.get_sample(row, col, height, Some(timeout)).await;
520 (row, col, res)
521 }
522 })
523 .collect::<FuturesUnordered<_>>();
524
525 let mut sampling_timed_out = false;
526
527 while let Some((row, column, res)) = futs.next().await {
529 let timed_out = match res {
530 Ok(_) => false,
531 Err(P2pError::BitswapQueryTimeout) => true,
536 Err(e) => return Err(e.into()),
537 };
538
539 if timed_out {
540 sampling_timed_out = true;
541 }
542
543 event_pub.send(NodeEvent::ShareSamplingResult {
544 height,
545 square_width,
546 row,
547 column,
548 timed_out,
549 });
550 }
551
552 event_pub.send(NodeEvent::SamplingResult {
553 height,
554 timed_out: sampling_timed_out,
555 took: now.elapsed(),
556 });
557
558 Ok((height, sampling_timed_out))
559 }
560 .boxed();
561
562 self.sampling_futs.push(fut);
563 self.ongoing
564 .insert_relaxed(height..=height)
565 .expect("invalid height");
566
567 Ok(true)
568 }
569
570 async fn update_queue(&mut self) -> Result<()> {
572 let stored = self.store.get_stored_header_ranges().await?;
573 let sampled = self.store.get_sampled_ranges().await?;
574
575 self.head_height = stored.head();
576 self.queue = stored - &sampled - &self.timed_out - &self.ongoing - &self.will_be_pruned;
577
578 Ok(())
579 }
580
581 fn in_sampling_window(&self, time: Time) -> bool {
583 let now = Time::now();
584
585 if now < time {
587 return true;
588 }
589
590 let Ok(age) = now.duration_since(time) else {
591 return false;
592 };
593
594 age <= self.sampling_window
595 }
596}
597
598fn calc_timeout(header_time: Time, now: Time, sampling_window: Duration) -> Duration {
599 let sampling_window_end = now.saturating_sub(sampling_window);
600
601 header_time
602 .duration_since(sampling_window_end)
603 .unwrap_or(GET_SAMPLE_MIN_TIMEOUT)
604 .max(GET_SAMPLE_MIN_TIMEOUT)
605}
606
607fn random_indexes(square_width: u16, max_samples_needed: usize) -> HashSet<(u16, u16)> {
609 let samples_in_block = usize::from(square_width).pow(2);
610
611 if samples_in_block <= max_samples_needed {
614 return (0..square_width)
615 .flat_map(|row| (0..square_width).map(move |col| (row, col)))
616 .collect();
617 }
618
619 let mut indexes = HashSet::with_capacity(max_samples_needed);
620 let mut rng = rand::thread_rng();
621
622 while indexes.len() < max_samples_needed {
623 let row = rng.r#gen::<u16>() % square_width;
624 let col = rng.r#gen::<u16>() % square_width;
625 indexes.insert((row, col));
626 }
627
628 indexes
629}
630
631#[cfg(test)]
632mod tests {
633 use super::*;
634 use crate::events::{EventChannel, EventSubscriber};
635 use crate::node::SAMPLING_WINDOW;
636 use crate::p2p::P2pCmd;
637 use crate::p2p::shwap::convert_cid;
638 use crate::store::InMemoryStore;
639 use crate::test_utils::{ExtendedHeaderGeneratorExt, MockP2pHandle};
640 use crate::utils::OneshotResultSender;
641 use bytes::BytesMut;
642 use celestia_proto::bitswap::Block;
643 use celestia_types::consts::appconsts::AppVersion;
644 use celestia_types::sample::{Sample, SampleId};
645 use celestia_types::test_utils::{ExtendedHeaderGenerator, generate_dummy_eds};
646 use celestia_types::{AxisType, DataAvailabilityHeader, ExtendedDataSquare};
647 use cid::Cid;
648 use lumina_utils::test_utils::async_test;
649 use lumina_utils::time::sleep;
650 use prost::Message;
651 use std::collections::HashMap;
652 use std::time::Duration;
653
654 const REQ_TIMEOUT_SHARE_NUM: usize = 2;
658
659 #[async_test]
660 async fn check_calc_timeout() {
661 let now = Time::now();
662 let sampling_window = Duration::from_secs(60);
663
664 let header_time = now;
665 assert_eq!(
666 calc_timeout(header_time, now, sampling_window),
667 Duration::from_secs(60)
668 );
669
670 let header_time = now.checked_sub(Duration::from_secs(1)).unwrap();
671 assert_eq!(
672 calc_timeout(header_time, now, sampling_window),
673 Duration::from_secs(59)
674 );
675
676 let header_time = now.checked_sub(Duration::from_secs(49)).unwrap();
677 assert_eq!(
678 calc_timeout(header_time, now, sampling_window),
679 Duration::from_secs(11)
680 );
681
682 let header_time = now.checked_sub(Duration::from_secs(50)).unwrap();
683 assert_eq!(
684 calc_timeout(header_time, now, sampling_window),
685 Duration::from_secs(10)
686 );
687
688 let header_time = now.checked_sub(Duration::from_secs(51)).unwrap();
690 assert_eq!(
691 calc_timeout(header_time, now, sampling_window),
692 GET_SAMPLE_MIN_TIMEOUT
693 );
694
695 let header_time = now.checked_sub(Duration::from_secs(60)).unwrap();
697 assert_eq!(
698 calc_timeout(header_time, now, sampling_window),
699 GET_SAMPLE_MIN_TIMEOUT
700 );
701
702 let header_time = now.checked_sub(Duration::from_secs(61)).unwrap();
704 assert_eq!(
705 calc_timeout(header_time, now, sampling_window),
706 GET_SAMPLE_MIN_TIMEOUT
707 );
708
709 let header_time = now.checked_add(Duration::from_secs(1)).unwrap();
711 assert_eq!(
712 calc_timeout(header_time, now, sampling_window),
713 Duration::from_secs(61)
714 );
715 }
716
717 #[async_test]
718 async fn received_valid_samples() {
719 let (mock, mut handle) = P2p::mocked();
720 let store = Arc::new(InMemoryStore::new());
721 let events = EventChannel::new();
722 let mut event_sub = events.subscribe();
723
724 let _daser = Daser::start(DaserArgs {
725 event_pub: events.publisher(),
726 p2p: Arc::new(mock),
727 store: store.clone(),
728 sampling_window: SAMPLING_WINDOW,
729 concurrency_limit: 1,
730 additional_headersub_concurrency: DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY,
731 })
732 .unwrap();
733
734 let mut generator = ExtendedHeaderGenerator::new();
735
736 handle.expect_no_cmd().await;
737 handle.announce_peer_connected();
738 handle.expect_no_cmd().await;
739
740 gen_and_sample_block(
741 &mut handle,
742 &mut generator,
743 &store,
744 &mut event_sub,
745 2,
746 false,
747 )
748 .await;
749 gen_and_sample_block(
750 &mut handle,
751 &mut generator,
752 &store,
753 &mut event_sub,
754 4,
755 false,
756 )
757 .await;
758 gen_and_sample_block(
759 &mut handle,
760 &mut generator,
761 &store,
762 &mut event_sub,
763 8,
764 false,
765 )
766 .await;
767 gen_and_sample_block(
768 &mut handle,
769 &mut generator,
770 &store,
771 &mut event_sub,
772 16,
773 false,
774 )
775 .await;
776 }
777
778 #[async_test]
779 async fn sampling_timeout() {
780 let (mock, mut handle) = P2p::mocked();
781 let store = Arc::new(InMemoryStore::new());
782 let events = EventChannel::new();
783 let mut event_sub = events.subscribe();
784
785 let _daser = Daser::start(DaserArgs {
786 event_pub: events.publisher(),
787 p2p: Arc::new(mock),
788 store: store.clone(),
789 sampling_window: SAMPLING_WINDOW,
790 concurrency_limit: 1,
791 additional_headersub_concurrency: DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY,
792 })
793 .unwrap();
794
795 let mut generator = ExtendedHeaderGenerator::new();
796
797 handle.expect_no_cmd().await;
798 handle.announce_peer_connected();
799 handle.expect_no_cmd().await;
800
801 gen_and_sample_block(
802 &mut handle,
803 &mut generator,
804 &store,
805 &mut event_sub,
806 2,
807 false,
808 )
809 .await;
810 gen_and_sample_block(&mut handle, &mut generator, &store, &mut event_sub, 4, true).await;
811 gen_and_sample_block(
812 &mut handle,
813 &mut generator,
814 &store,
815 &mut event_sub,
816 8,
817 false,
818 )
819 .await;
820 }
821
822 #[async_test]
823 async fn backward_dasing() {
824 let (mock, mut handle) = P2p::mocked();
825 let store = Arc::new(InMemoryStore::new());
826 let events = EventChannel::new();
827
828 let _daser = Daser::start(DaserArgs {
829 event_pub: events.publisher(),
830 p2p: Arc::new(mock),
831 store: store.clone(),
832 sampling_window: SAMPLING_WINDOW,
833 concurrency_limit: 1,
834 additional_headersub_concurrency: DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY,
835 })
836 .unwrap();
837
838 let mut generator = ExtendedHeaderGenerator::new();
839
840 handle.expect_no_cmd().await;
841 handle.announce_peer_connected();
842 handle.expect_no_cmd().await;
843
844 let mut edses = Vec::new();
845 let mut headers = Vec::new();
846
847 for _ in 0..20 {
848 let eds = generate_dummy_eds(2, AppVersion::V2);
849 let dah = DataAvailabilityHeader::from_eds(&eds);
850 let header = generator.next_with_dah(dah);
851
852 edses.push(eds);
853 headers.push(header);
854 }
855
856 store.insert(headers[4..=9].to_vec()).await.unwrap();
858
859 handle_get_shwap_cid(&mut handle, 10, &edses[9], false).await;
861
862 handle_get_shwap_cid(&mut handle, 9, &edses[8], false).await;
864
865 sleep(Duration::from_millis(10)).await;
867
868 store.insert(headers[15..=19].to_vec()).await.unwrap();
870
871 sleep(Duration::from_millis(10)).await;
873
874 handle_concurrent_get_shwap_cid(
876 &mut handle,
877 [(8, &edses[9], false), (20, &edses[19], false)],
878 )
879 .await;
880
881 handle_get_shwap_cid(&mut handle, 19, &edses[18], true).await;
883
884 handle.announce_all_peers_disconnected();
886
887 while let Some(cmd) = handle.try_recv_cmd().await {
890 match cmd {
891 P2pCmd::GetShwapCid { respond_to, .. } => {
892 let _ = respond_to.send(Err(P2pError::BitswapQueryTimeout));
893 }
894 cmd => panic!("Unexpected command: {cmd:?}"),
895 }
896 }
897
898 handle.expect_no_cmd().await;
900
901 handle.announce_peer_connected();
903
904 handle_get_shwap_cid(&mut handle, 19, &edses[18], false).await;
906
907 for height in (16..=18).rev() {
909 let idx = height as usize - 1;
910 handle_get_shwap_cid(&mut handle, height, &edses[idx], false).await;
911 }
912
913 for height in (5..=7).rev() {
915 let idx = height as usize - 1;
916 handle_get_shwap_cid(&mut handle, height, &edses[idx], false).await;
917 }
918
919 handle.expect_no_cmd().await;
920
921 let eds = generate_dummy_eds(2, AppVersion::V2);
923 let dah = DataAvailabilityHeader::from_eds(&eds);
924 let header = generator.next_with_dah(dah);
925 store.insert(header).await.unwrap();
926
927 handle_get_shwap_cid(&mut handle, 21, &eds, false).await;
929
930 handle.expect_no_cmd().await;
931 }
932
933 #[async_test]
934 async fn concurrency_limits() {
935 let (mock, mut handle) = P2p::mocked();
936 let store = Arc::new(InMemoryStore::new());
937 let events = EventChannel::new();
938
939 let concurrency_limit = 10;
941 let additional_headersub_concurrency = 5;
944 let shares_per_block = 4;
947
948 let mut generator = ExtendedHeaderGenerator::new();
949 store
950 .insert(generator.next_many_verified(30))
951 .await
952 .unwrap();
953
954 let _daser = Daser::start(DaserArgs {
955 event_pub: events.publisher(),
956 p2p: Arc::new(mock),
957 store: store.clone(),
958 sampling_window: SAMPLING_WINDOW,
959 concurrency_limit,
960 additional_headersub_concurrency,
961 })
962 .unwrap();
963
964 handle.expect_no_cmd().await;
965 handle.announce_peer_connected();
966
967 let mut hold_respond_channels = Vec::new();
968
969 for _ in 0..(concurrency_limit * shares_per_block) {
970 let (cid, respond_to) = handle.expect_get_shwap_cid().await;
971 hold_respond_channels.push((cid, respond_to));
972 }
973
974 handle.expect_no_cmd().await;
976
977 store.insert(generator.next_many_verified(2)).await.unwrap();
979
980 for _ in 0..shares_per_block {
981 let (cid, respond_to) = handle.expect_get_shwap_cid().await;
982 hold_respond_channels.push((cid, respond_to));
983 }
984 handle.expect_no_cmd().await;
985
986 stop_sampling_for(&mut hold_respond_channels, 29);
990 stop_sampling_for(&mut hold_respond_channels, 30);
991
992 for _ in 0..shares_per_block {
994 let (cid, respond_to) = handle.expect_get_shwap_cid().await;
995 hold_respond_channels.push((cid, respond_to));
996 }
997
998 handle.expect_no_cmd().await;
1000
1001 for _ in 0..additional_headersub_concurrency {
1003 store.insert(generator.next_many_verified(1)).await.unwrap();
1004 sleep(Duration::from_millis(10)).await;
1006 }
1007
1008 for _ in 0..(additional_headersub_concurrency * shares_per_block) {
1009 let (cid, respond_to) = handle.expect_get_shwap_cid().await;
1010 hold_respond_channels.push((cid, respond_to));
1011 }
1012
1013 handle.expect_no_cmd().await;
1015 store.insert(generator.next_many_verified(1)).await.unwrap();
1016 handle.expect_no_cmd().await;
1017
1018 stop_sampling_for(&mut hold_respond_channels, 28);
1021
1022 for _ in 0..shares_per_block {
1023 let (cid, respond_to) = handle.expect_get_shwap_cid().await;
1024 hold_respond_channels.push((cid, respond_to));
1025 }
1026
1027 handle.expect_no_cmd().await;
1029 store.insert(generator.next_many_verified(1)).await.unwrap();
1030 handle.expect_no_cmd().await;
1031 }
1032
1033 #[async_test]
1034 async fn ratelimit() {
1035 let (mock, mut handle) = P2p::mocked();
1036 let store = Arc::new(InMemoryStore::new());
1037 let events = EventChannel::new();
1038 let mut event_sub = events.subscribe();
1039
1040 let daser = Daser::start(DaserArgs {
1041 event_pub: events.publisher(),
1042 p2p: Arc::new(mock),
1043 store: store.clone(),
1044 sampling_window: Duration::from_secs(60),
1045 concurrency_limit: 1,
1046 additional_headersub_concurrency: DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY,
1047 })
1048 .unwrap();
1049
1050 let mut generator = ExtendedHeaderGenerator::new();
1051
1052 let now = Time::now();
1053 let first_header_time = (now - Duration::from_secs(1024)).unwrap();
1054 generator.set_time(first_header_time, Duration::from_secs(1));
1055 store.insert(generator.next_many(990)).await.unwrap();
1056
1057 let mut edses = HashMap::new();
1058
1059 for height in 991..=1000 {
1060 let eds = generate_dummy_eds(2, AppVersion::V2);
1061 let dah = DataAvailabilityHeader::from_eds(&eds);
1062 let header = generator.next_with_dah(dah);
1063
1064 edses.insert(height, eds);
1065 store.insert(header).await.unwrap();
1066 }
1067
1068 daser.update_highest_prunable_block(1000).await.unwrap();
1070 daser.update_number_of_prunable_blocks(1000).await.unwrap();
1071
1072 handle.expect_no_cmd().await;
1073 handle.announce_peer_connected();
1074
1075 handle.expect_no_cmd().await;
1077
1078 gen_and_sample_block(
1080 &mut handle,
1081 &mut generator,
1082 &store,
1083 &mut event_sub,
1084 2,
1085 false,
1086 )
1087 .await;
1088 gen_and_sample_block(
1089 &mut handle,
1090 &mut generator,
1091 &store,
1092 &mut event_sub,
1093 2,
1094 false,
1095 )
1096 .await;
1097
1098 handle.expect_no_cmd().await;
1100
1101 daser
1103 .update_number_of_prunable_blocks(PRUNER_THRESHOLD - 1)
1104 .await
1105 .unwrap();
1106 sleep(Duration::from_millis(5)).await;
1107
1108 sample_block(
1110 &mut handle,
1111 &store,
1112 &mut event_sub,
1113 edses.get(&1000).unwrap(),
1114 1000,
1115 false,
1116 )
1117 .await;
1118 sample_block(
1119 &mut handle,
1120 &store,
1121 &mut event_sub,
1122 edses.get(&999).unwrap(),
1123 999,
1124 false,
1125 )
1126 .await;
1127
1128 daser
1130 .update_number_of_prunable_blocks(PRUNER_THRESHOLD)
1131 .await
1132 .unwrap();
1133 sleep(Duration::from_millis(5)).await;
1134
1135 sample_block(
1137 &mut handle,
1138 &store,
1139 &mut event_sub,
1140 edses.get(&998).unwrap(),
1141 998,
1142 false,
1143 )
1144 .await;
1145
1146 handle.expect_no_cmd().await;
1148 }
1149
1150 fn stop_sampling_for(
1151 responders: &mut Vec<(Cid, OneshotResultSender<Vec<u8>, P2pError>)>,
1152 height: u64,
1153 ) {
1154 let mut indexes = Vec::new();
1155
1156 for (idx, (cid, _)) in responders.iter().enumerate() {
1157 let sample_id: SampleId = cid.try_into().unwrap();
1158 if sample_id.block_height() == height {
1159 indexes.push(idx)
1160 }
1161 }
1162
1163 for idx in indexes.into_iter().rev() {
1164 let (_cid, respond_to) = responders.remove(idx);
1165 respond_to.send(Err(P2pError::BitswapQueryTimeout)).unwrap();
1166 }
1167 }
1168
1169 async fn gen_and_sample_block(
1170 handle: &mut MockP2pHandle,
1171 generator: &mut ExtendedHeaderGenerator,
1172 store: &InMemoryStore,
1173 event_sub: &mut EventSubscriber,
1174 square_width: usize,
1175 simulate_sampling_timeout: bool,
1176 ) {
1177 let eds = generate_dummy_eds(square_width, AppVersion::V2);
1178 let dah = DataAvailabilityHeader::from_eds(&eds);
1179 let header = generator.next_with_dah(dah);
1180 let height = header.height().value();
1181
1182 store.insert(header).await.unwrap();
1183
1184 sample_block(
1185 handle,
1186 store,
1187 event_sub,
1188 &eds,
1189 height,
1190 simulate_sampling_timeout,
1191 )
1192 .await;
1193
1194 handle.expect_no_cmd().await;
1197 assert!(event_sub.try_recv().is_err());
1198 }
1199
1200 async fn sample_block(
1201 handle: &mut MockP2pHandle,
1202 store: &InMemoryStore,
1203 event_sub: &mut EventSubscriber,
1204 eds: &ExtendedDataSquare,
1205 height: u64,
1206 simulate_sampling_timeout: bool,
1207 ) {
1208 let cids = handle_get_shwap_cid(handle, height, eds, simulate_sampling_timeout).await;
1209
1210 sleep(Duration::from_millis(100)).await;
1212
1213 let sampled_ranges = store.get_sampled_ranges().await.unwrap();
1215 assert_eq!(sampled_ranges.contains(height), !simulate_sampling_timeout);
1216
1217 let mut sampling_metadata = store.get_sampling_metadata(height).await.unwrap().unwrap();
1219 sampling_metadata.cids.sort();
1220 assert_eq!(&sampling_metadata.cids, &cids);
1221
1222 let mut remaining_shares = match event_sub.try_recv().unwrap().event {
1224 NodeEvent::SamplingStarted {
1225 height: ev_height,
1226 square_width,
1227 shares,
1228 } => {
1229 assert_eq!(ev_height, height);
1230 assert_eq!(square_width, eds.square_width());
1231
1232 let mut cids = shares
1234 .iter()
1235 .map(|(row, col)| sample_cid(*row, *col, height).unwrap())
1236 .collect::<Vec<_>>();
1237 cids.sort();
1238 assert_eq!(&sampling_metadata.cids, &cids);
1239
1240 shares.into_iter().collect::<HashSet<_>>()
1241 }
1242 ev => panic!("Unexpected event: {ev}"),
1243 };
1244
1245 for i in 1..=remaining_shares.len() {
1247 match event_sub.try_recv().unwrap().event {
1248 NodeEvent::ShareSamplingResult {
1249 height: ev_height,
1250 square_width,
1251 row,
1252 column,
1253 timed_out,
1254 } => {
1255 assert_eq!(ev_height, height);
1256 assert_eq!(square_width, eds.square_width());
1257 assert_eq!(
1258 timed_out,
1259 simulate_sampling_timeout && i == REQ_TIMEOUT_SHARE_NUM
1260 );
1261 assert!(remaining_shares.remove(&(row, column)));
1263 }
1264 ev => panic!("Unexpected event: {ev}"),
1265 }
1266 }
1267
1268 assert!(remaining_shares.is_empty());
1269
1270 match event_sub.try_recv().unwrap().event {
1272 NodeEvent::SamplingResult {
1273 height: ev_height,
1274 timed_out,
1275 took,
1276 } => {
1277 assert_eq!(ev_height, height);
1278 assert_eq!(timed_out, simulate_sampling_timeout);
1279 assert_ne!(took, Duration::default());
1280 }
1281 ev => panic!("Unexpected event: {ev}"),
1282 }
1283 }
1284
1285 async fn handle_concurrent_get_shwap_cid<const N: usize>(
1287 handle: &mut MockP2pHandle,
1288 handling_args: [(u64, &ExtendedDataSquare, bool); N],
1289 ) -> Vec<Cid> {
1290 struct Info<'a> {
1291 eds: &'a ExtendedDataSquare,
1292 simulate_sampling_timeout: bool,
1293 needed_samples: usize,
1294 requests_count: usize,
1295 }
1296
1297 let mut infos = handling_args
1298 .into_iter()
1299 .map(|(height, eds, simulate_sampling_timeout)| {
1300 let square_width = eds.square_width() as usize;
1301 let needed_samples = (square_width * square_width).min(MAX_SAMPLES_NEEDED);
1302
1303 (
1304 height,
1305 Info {
1306 eds,
1307 simulate_sampling_timeout,
1308 needed_samples,
1309 requests_count: 0,
1310 },
1311 )
1312 })
1313 .collect::<HashMap<_, _>>();
1314
1315 let needed_samples_sum = infos.values().map(|info| info.needed_samples).sum();
1316 let mut cids = Vec::with_capacity(needed_samples_sum);
1317
1318 for _ in 0..needed_samples_sum {
1319 let (cid, respond_to) = handle.expect_get_shwap_cid().await;
1320 cids.push(cid);
1321
1322 let sample_id: SampleId = cid.try_into().unwrap();
1323 let info = infos
1324 .get_mut(&sample_id.block_height())
1325 .unwrap_or_else(|| panic!("Unexpected height: {}", sample_id.block_height()));
1326
1327 info.requests_count += 1;
1328
1329 if info.simulate_sampling_timeout && info.requests_count == REQ_TIMEOUT_SHARE_NUM {
1331 respond_to.send(Err(P2pError::BitswapQueryTimeout)).unwrap();
1332 continue;
1333 }
1334
1335 let sample = gen_sample_of_cid(sample_id, info.eds).await;
1336 respond_to.send(Ok(sample)).unwrap();
1337 }
1338
1339 cids.sort();
1340 cids
1341 }
1342
1343 async fn handle_get_shwap_cid(
1345 handle: &mut MockP2pHandle,
1346 height: u64,
1347 eds: &ExtendedDataSquare,
1348 simulate_sampling_timeout: bool,
1349 ) -> Vec<Cid> {
1350 handle_concurrent_get_shwap_cid(handle, [(height, eds, simulate_sampling_timeout)]).await
1351 }
1352
1353 async fn gen_sample_of_cid(sample_id: SampleId, eds: &ExtendedDataSquare) -> Vec<u8> {
1354 let sample = Sample::new(
1355 sample_id.row_index(),
1356 sample_id.column_index(),
1357 AxisType::Row,
1358 eds,
1359 )
1360 .unwrap();
1361
1362 let mut container = BytesMut::new();
1363 sample.encode(&mut container);
1364
1365 let block = Block {
1366 cid: convert_cid(&sample_id.into()).unwrap().to_bytes(),
1367 container: container.to_vec(),
1368 };
1369
1370 block.encode_to_vec()
1371 }
1372}