1use std::collections::HashSet;
33use std::sync::Arc;
34
35use futures::future::BoxFuture;
36use futures::stream::FuturesUnordered;
37use futures::{FutureExt, StreamExt};
38use rand::Rng;
39use tendermint::Time;
40use tokio::select;
41use tokio_util::sync::CancellationToken;
42use tracing::{debug, error, warn};
43use web_time::{Duration, Instant};
44
45use lumina_utils::executor::{spawn, JoinHandle};
46
47use crate::events::{EventPublisher, NodeEvent};
48use crate::p2p::shwap::sample_cid;
49use crate::p2p::{P2p, P2pError};
50use crate::store::{BlockRanges, SamplingStatus, Store, StoreError};
51
52const MAX_SAMPLES_NEEDED: usize = 16;
53const GET_SAMPLE_TIMEOUT: Duration = Duration::from_secs(10);
54
55type Result<T, E = DaserError> = std::result::Result<T, E>;
56
57#[derive(Debug, thiserror::Error)]
59pub enum DaserError {
60 #[error("P2p: {0}")]
62 P2p(#[from] P2pError),
63
64 #[error("Store: {0}")]
66 Store(#[from] StoreError),
67}
68
69pub(crate) struct Daser {
71 cancellation_token: CancellationToken,
72 join_handle: JoinHandle,
73}
74
75pub(crate) struct DaserArgs<S>
77where
78 S: Store,
79{
80 pub(crate) p2p: Arc<P2p>,
82 pub(crate) store: Arc<S>,
84 pub(crate) event_pub: EventPublisher,
86 pub(crate) sampling_window: Duration,
88}
89
90impl Daser {
91 pub(crate) fn start<S>(args: DaserArgs<S>) -> Result<Self>
93 where
94 S: Store + 'static,
95 {
96 let cancellation_token = CancellationToken::new();
97 let event_pub = args.event_pub.clone();
98 let mut worker = Worker::new(args, cancellation_token.child_token())?;
99
100 let join_handle = spawn(async move {
101 if let Err(e) = worker.run().await {
102 error!("Daser stopped because of a fatal error: {e}");
103
104 event_pub.send(NodeEvent::FatalDaserError {
105 error: e.to_string(),
106 });
107 }
108 });
109
110 Ok(Daser {
111 cancellation_token,
112 join_handle,
113 })
114 }
115
116 pub(crate) fn stop(&self) {
118 self.cancellation_token.cancel();
120 }
121
122 pub(crate) async fn join(&self) {
124 self.join_handle.join().await;
125 }
126}
127
128impl Drop for Daser {
129 fn drop(&mut self) {
130 self.stop();
131 }
132}
133
134struct Worker<S>
135where
136 S: Store + 'static,
137{
138 cancellation_token: CancellationToken,
139 event_pub: EventPublisher,
140 p2p: Arc<P2p>,
141 store: Arc<S>,
142 max_samples_needed: usize,
143 sampling_futs: FuturesUnordered<BoxFuture<'static, Result<(u64, bool)>>>,
144 queue: BlockRanges,
145 done: BlockRanges,
146 ongoing: BlockRanges,
147 prev_head: Option<u64>,
148 sampling_window: Duration,
149}
150
151impl<S> Worker<S>
152where
153 S: Store,
154{
155 fn new(args: DaserArgs<S>, cancellation_token: CancellationToken) -> Result<Worker<S>> {
156 Ok(Worker {
157 cancellation_token,
158 event_pub: args.event_pub,
159 p2p: args.p2p,
160 store: args.store,
161 max_samples_needed: MAX_SAMPLES_NEEDED,
162 sampling_futs: FuturesUnordered::new(),
163 queue: BlockRanges::default(),
164 done: BlockRanges::default(),
165 ongoing: BlockRanges::default(),
166 prev_head: None,
167 sampling_window: args.sampling_window,
168 })
169 }
170
171 async fn run(&mut self) -> Result<()> {
172 loop {
173 if self.cancellation_token.is_cancelled() {
174 break;
175 }
176
177 self.connecting_event_loop().await;
178
179 if self.cancellation_token.is_cancelled() {
180 break;
181 }
182
183 self.connected_event_loop().await?;
184 }
185
186 debug!("Daser stopped");
187 Ok(())
188 }
189
190 async fn connecting_event_loop(&mut self) {
191 debug!("Entering connecting_event_loop");
192
193 let mut peer_tracker_info_watcher = self.p2p.peer_tracker_info_watcher();
194
195 if peer_tracker_info_watcher.borrow().num_connected_peers > 0 {
197 return;
198 }
199
200 loop {
201 select! {
202 _ = self.cancellation_token.cancelled() => {
203 break;
204 }
205 _ = peer_tracker_info_watcher.changed() => {
206 if peer_tracker_info_watcher.borrow().num_connected_peers > 0 {
207 break;
208 }
209 }
210 }
211 }
212 }
213
214 async fn connected_event_loop(&mut self) -> Result<()> {
215 debug!("Entering connected_event_loop");
216
217 let mut peer_tracker_info_watcher = self.p2p.peer_tracker_info_watcher();
218
219 if peer_tracker_info_watcher.borrow().num_connected_peers == 0 {
221 warn!("All peers disconnected");
222 return Ok(());
223 }
224
225 let store = self.store.clone();
230 let mut wait_new_head = store.wait_new_head();
231
232 self.populate_queue().await?;
233
234 loop {
235 if let Some(queue_head) = self.queue.head() {
237 if queue_head > self.prev_head.unwrap_or(0) {
238 self.schedule_next_sample_block().await?;
239 self.prev_head = Some(queue_head);
240 }
241 }
242
243 if self.sampling_futs.is_empty() {
245 self.schedule_next_sample_block().await?;
246 }
247
248 select! {
249 _ = self.cancellation_token.cancelled() => {
250 break;
251 }
252 _ = peer_tracker_info_watcher.changed() => {
253 if peer_tracker_info_watcher.borrow().num_connected_peers == 0 {
254 warn!("All peers disconnected");
255 break;
256 }
257 }
258 Some(res) = self.sampling_futs.next() => {
259 let (height, accepted) = res?;
262
263 let status = if accepted {
264 SamplingStatus::Accepted
265 } else {
266 SamplingStatus::Rejected
267 };
268
269 self.store
270 .update_sampling_metadata(height, status, Vec::new())
271 .await?;
272
273 self.ongoing.remove_relaxed(height..=height).expect("invalid height");
274 self.done.insert_relaxed(height..=height).expect("invalid height");
275 },
276 _ = &mut wait_new_head => {
277 wait_new_head = store.wait_new_head();
278 self.populate_queue().await?;
279 }
280 }
281 }
282
283 self.sampling_futs.clear();
284 self.queue = BlockRanges::default();
285 self.ongoing = BlockRanges::default();
286 self.done = BlockRanges::default();
287 self.prev_head = None;
288
289 Ok(())
290 }
291
292 async fn schedule_next_sample_block(&mut self) -> Result<()> {
293 let header = loop {
295 let Some(height) = self.queue.pop_head() else {
296 return Ok(());
297 };
298
299 match self.store.get_by_height(height).await {
300 Ok(header) => break header,
301 Err(StoreError::NotFound) => {
302 self.populate_queue().await?;
305 }
306 Err(e) => return Err(e.into()),
307 }
308 };
309
310 let height = header.height().value();
311 let square_width = header.dah.square_width();
312
313 if !self.in_sampling_window(header.time()) {
315 self.queue
318 .remove_relaxed(1..=height)
319 .expect("invalid height");
320 self.done
321 .insert_relaxed(1..=height)
322 .expect("invalid height");
323 return Ok(());
324 }
325
326 let share_indexes = random_indexes(square_width, self.max_samples_needed);
328
329 let cids = share_indexes
332 .iter()
333 .map(|(row, col)| sample_cid(*row, *col, height))
334 .collect::<Result<Vec<_>, _>>()?;
335
336 self.store
340 .update_sampling_metadata(height, SamplingStatus::Unknown, cids)
341 .await?;
342
343 let p2p = self.p2p.clone();
344 let event_pub = self.event_pub.clone();
345
346 let fut = async move {
348 let now = Instant::now();
349
350 event_pub.send(NodeEvent::SamplingStarted {
351 height,
352 square_width,
353 shares: share_indexes.iter().copied().collect(),
354 });
355
356 let mut futs = share_indexes
358 .into_iter()
359 .map(|(row, col)| {
360 let p2p = p2p.clone();
361
362 async move {
363 let res = p2p
364 .get_sample(row, col, height, Some(GET_SAMPLE_TIMEOUT))
365 .await;
366 (row, col, res)
367 }
368 })
369 .collect::<FuturesUnordered<_>>();
370
371 let mut block_accepted = true;
372
373 while let Some((row, column, res)) = futs.next().await {
375 let share_accepted = match res {
376 Ok(_) => true,
377 Err(P2pError::BitswapQueryTimeout) => false,
382 Err(e) => return Err(e.into()),
383 };
384
385 block_accepted &= share_accepted;
386
387 event_pub.send(NodeEvent::ShareSamplingResult {
388 height,
389 square_width,
390 row,
391 column,
392 accepted: share_accepted,
393 });
394 }
395
396 event_pub.send(NodeEvent::SamplingFinished {
397 height,
398 accepted: block_accepted,
399 took: now.elapsed(),
400 });
401
402 Ok((height, block_accepted))
403 }
404 .boxed();
405
406 self.sampling_futs.push(fut);
407 self.ongoing
408 .insert_relaxed(height..=height)
409 .expect("invalid height");
410
411 Ok(())
412 }
413
414 async fn populate_queue(&mut self) -> Result<()> {
421 let stored = self.store.get_stored_header_ranges().await?;
422 let accepted = self.store.get_accepted_sampling_ranges().await?;
423
424 self.queue = stored - accepted - &self.done - &self.ongoing;
425
426 Ok(())
427 }
428
429 fn in_sampling_window(&self, time: Time) -> bool {
431 let now = Time::now();
432
433 if now < time {
435 return true;
436 }
437
438 let Ok(age) = now.duration_since(time) else {
439 return false;
440 };
441
442 age <= self.sampling_window
443 }
444}
445
446fn random_indexes(square_width: u16, max_samples_needed: usize) -> HashSet<(u16, u16)> {
448 let samples_in_block = usize::from(square_width).pow(2);
449
450 if samples_in_block <= max_samples_needed {
453 return (0..square_width)
454 .flat_map(|row| (0..square_width).map(move |col| (row, col)))
455 .collect();
456 }
457
458 let mut indexes = HashSet::with_capacity(max_samples_needed);
459 let mut rng = rand::thread_rng();
460
461 while indexes.len() < max_samples_needed {
462 let row = rng.gen::<u16>() % square_width;
463 let col = rng.gen::<u16>() % square_width;
464 indexes.insert((row, col));
465 }
466
467 indexes
468}
469
470#[cfg(test)]
471mod tests {
472 use super::*;
473 use crate::events::{EventChannel, EventSubscriber};
474 use crate::node::DEFAULT_SAMPLING_WINDOW;
475 use crate::p2p::shwap::convert_cid;
476 use crate::p2p::P2pCmd;
477 use crate::store::InMemoryStore;
478 use crate::test_utils::MockP2pHandle;
479 use bytes::BytesMut;
480 use celestia_proto::bitswap::Block;
481 use celestia_types::consts::appconsts::AppVersion;
482 use celestia_types::sample::{Sample, SampleId};
483 use celestia_types::test_utils::{generate_dummy_eds, ExtendedHeaderGenerator};
484 use celestia_types::{AxisType, DataAvailabilityHeader, ExtendedDataSquare};
485 use cid::Cid;
486 use lumina_utils::test_utils::async_test;
487 use lumina_utils::time::sleep;
488 use prost::Message;
489 use std::collections::HashMap;
490 use std::time::Duration;
491
492 const INVALID_SHARE_REQ_NUM: usize = 2;
496
497 #[async_test]
498 async fn received_valid_samples() {
499 let (mock, mut handle) = P2p::mocked();
500 let store = Arc::new(InMemoryStore::new());
501 let events = EventChannel::new();
502 let mut event_sub = events.subscribe();
503
504 let _daser = Daser::start(DaserArgs {
505 event_pub: events.publisher(),
506 p2p: Arc::new(mock),
507 store: store.clone(),
508 sampling_window: DEFAULT_SAMPLING_WINDOW,
509 })
510 .unwrap();
511
512 let mut gen = ExtendedHeaderGenerator::new();
513
514 handle.expect_no_cmd().await;
515 handle.announce_peer_connected();
516 handle.expect_no_cmd().await;
517
518 gen_and_sample_block(&mut handle, &mut gen, &store, &mut event_sub, 2, false).await;
519 gen_and_sample_block(&mut handle, &mut gen, &store, &mut event_sub, 4, false).await;
520 gen_and_sample_block(&mut handle, &mut gen, &store, &mut event_sub, 8, false).await;
521 gen_and_sample_block(&mut handle, &mut gen, &store, &mut event_sub, 16, false).await;
522 }
523
524 #[async_test]
525 async fn received_invalid_sample() {
526 let (mock, mut handle) = P2p::mocked();
527 let store = Arc::new(InMemoryStore::new());
528 let events = EventChannel::new();
529 let mut event_sub = events.subscribe();
530
531 let _daser = Daser::start(DaserArgs {
532 event_pub: events.publisher(),
533 p2p: Arc::new(mock),
534 store: store.clone(),
535 sampling_window: DEFAULT_SAMPLING_WINDOW,
536 })
537 .unwrap();
538
539 let mut gen = ExtendedHeaderGenerator::new();
540
541 handle.expect_no_cmd().await;
542 handle.announce_peer_connected();
543 handle.expect_no_cmd().await;
544
545 gen_and_sample_block(&mut handle, &mut gen, &store, &mut event_sub, 2, false).await;
546 gen_and_sample_block(&mut handle, &mut gen, &store, &mut event_sub, 4, true).await;
547 gen_and_sample_block(&mut handle, &mut gen, &store, &mut event_sub, 8, false).await;
548 }
549
550 #[async_test]
551 async fn backward_dasing() {
552 let (mock, mut handle) = P2p::mocked();
553 let store = Arc::new(InMemoryStore::new());
554 let events = EventChannel::new();
555
556 let _daser = Daser::start(DaserArgs {
557 event_pub: events.publisher(),
558 p2p: Arc::new(mock),
559 store: store.clone(),
560 sampling_window: DEFAULT_SAMPLING_WINDOW,
561 })
562 .unwrap();
563
564 let mut gen = ExtendedHeaderGenerator::new();
565
566 handle.expect_no_cmd().await;
567 handle.announce_peer_connected();
568 handle.expect_no_cmd().await;
569
570 let mut edses = Vec::new();
571 let mut headers = Vec::new();
572
573 for _ in 0..20 {
574 let eds = generate_dummy_eds(2, AppVersion::V2);
575 let dah = DataAvailabilityHeader::from_eds(&eds);
576 let header = gen.next_with_dah(dah);
577
578 edses.push(eds);
579 headers.push(header);
580 }
581
582 store.insert(headers[4..=9].to_vec()).await.unwrap();
584
585 handle_get_shwap_cid(&mut handle, 10, &edses[9], false).await;
587
588 handle_get_shwap_cid(&mut handle, 9, &edses[8], false).await;
590
591 sleep(Duration::from_millis(10)).await;
593
594 store.insert(headers[15..=19].to_vec()).await.unwrap();
596
597 sleep(Duration::from_millis(10)).await;
599
600 handle_concurrent_get_shwap_cid(
602 &mut handle,
603 [(8, &edses[9], false), (20, &edses[19], false)],
604 )
605 .await;
606
607 handle_get_shwap_cid(&mut handle, 19, &edses[18], true).await;
609
610 handle.announce_all_peers_disconnected();
612
613 while let Some(cmd) = handle.try_recv_cmd().await {
616 match cmd {
617 P2pCmd::GetShwapCid { respond_to, .. } => {
618 let _ = respond_to.send(Err(P2pError::BitswapQueryTimeout));
619 }
620 cmd => panic!("Unexpected command: {cmd:?}"),
621 }
622 }
623
624 handle.expect_no_cmd().await;
626
627 handle.announce_peer_connected();
629
630 handle_get_shwap_cid(&mut handle, 19, &edses[18], false).await;
632
633 for height in (16..=18).rev() {
635 let idx = height as usize - 1;
636 handle_get_shwap_cid(&mut handle, height, &edses[idx], false).await;
637 }
638
639 for height in (5..=7).rev() {
641 let idx = height as usize - 1;
642 handle_get_shwap_cid(&mut handle, height, &edses[idx], false).await;
643 }
644
645 handle.expect_no_cmd().await;
646
647 let eds = generate_dummy_eds(2, AppVersion::V2);
649 let dah = DataAvailabilityHeader::from_eds(&eds);
650 let header = gen.next_with_dah(dah);
651 store.insert(header).await.unwrap();
652
653 handle_get_shwap_cid(&mut handle, 21, &eds, false).await;
655
656 handle.expect_no_cmd().await;
657 }
658
659 async fn gen_and_sample_block(
660 handle: &mut MockP2pHandle,
661 gen: &mut ExtendedHeaderGenerator,
662 store: &InMemoryStore,
663 event_sub: &mut EventSubscriber,
664 square_width: usize,
665 simulate_invalid_sampling: bool,
666 ) {
667 let eds = generate_dummy_eds(square_width, AppVersion::V2);
668 let dah = DataAvailabilityHeader::from_eds(&eds);
669 let header = gen.next_with_dah(dah);
670 let height = header.height().value();
671
672 store.insert(header).await.unwrap();
673
674 let cids = handle_get_shwap_cid(handle, height, &eds, simulate_invalid_sampling).await;
675 handle.expect_no_cmd().await;
676
677 let mut sampling_metadata = store.get_sampling_metadata(height).await.unwrap().unwrap();
678 sampling_metadata.cids.sort();
679
680 if simulate_invalid_sampling {
681 assert_eq!(sampling_metadata.status, SamplingStatus::Rejected);
682 } else {
683 assert_eq!(sampling_metadata.status, SamplingStatus::Accepted);
684 }
685
686 assert_eq!(&sampling_metadata.cids, &cids);
688
689 let mut remaining_shares = match event_sub.try_recv().unwrap().event {
691 NodeEvent::SamplingStarted {
692 height: ev_height,
693 square_width,
694 shares,
695 } => {
696 assert_eq!(ev_height, height);
697 assert_eq!(square_width, eds.square_width());
698
699 let mut cids = shares
701 .iter()
702 .map(|(row, col)| sample_cid(*row, *col, height).unwrap())
703 .collect::<Vec<_>>();
704 cids.sort();
705 assert_eq!(&sampling_metadata.cids, &cids);
706
707 shares.into_iter().collect::<HashSet<_>>()
708 }
709 ev => panic!("Unexpected event: {ev}"),
710 };
711
712 for i in 1..=remaining_shares.len() {
714 match event_sub.try_recv().unwrap().event {
715 NodeEvent::ShareSamplingResult {
716 height: ev_height,
717 square_width,
718 row,
719 column,
720 accepted,
721 } => {
722 assert_eq!(ev_height, height);
723 assert_eq!(square_width, eds.square_width());
724 assert_eq!(
725 accepted,
726 !(simulate_invalid_sampling && i == INVALID_SHARE_REQ_NUM)
727 );
728 assert!(remaining_shares.remove(&(row, column)));
730 }
731 ev => panic!("Unexpected event: {ev}"),
732 }
733 }
734
735 assert!(remaining_shares.is_empty());
736
737 match event_sub.try_recv().unwrap().event {
739 NodeEvent::SamplingFinished {
740 height: ev_height,
741 accepted,
742 took,
743 } => {
744 assert_eq!(ev_height, height);
745 assert_eq!(accepted, !simulate_invalid_sampling);
746 assert_ne!(took, Duration::default());
747 }
748 ev => panic!("Unexpected event: {ev}"),
749 }
750
751 assert!(event_sub.try_recv().is_err());
752 }
753
754 async fn handle_concurrent_get_shwap_cid<const N: usize>(
756 handle: &mut MockP2pHandle,
757 handling_args: [(u64, &ExtendedDataSquare, bool); N],
758 ) -> Vec<Cid> {
759 struct Info<'a> {
760 eds: &'a ExtendedDataSquare,
761 simulate_invalid_sampling: bool,
762 needed_samples: usize,
763 requests_count: usize,
764 }
765
766 let mut infos = handling_args
767 .into_iter()
768 .map(|(height, eds, simulate_invalid_sampling)| {
769 let square_width = eds.square_width() as usize;
770 let needed_samples = (square_width * square_width).min(MAX_SAMPLES_NEEDED);
771
772 (
773 height,
774 Info {
775 eds,
776 simulate_invalid_sampling,
777 needed_samples,
778 requests_count: 0,
779 },
780 )
781 })
782 .collect::<HashMap<_, _>>();
783
784 let needed_samples_sum = infos.values().map(|info| info.needed_samples).sum();
785 let mut cids = Vec::with_capacity(needed_samples_sum);
786
787 for _ in 0..needed_samples_sum {
788 let (cid, respond_to) = handle.expect_get_shwap_cid().await;
789 cids.push(cid);
790
791 let sample_id: SampleId = cid.try_into().unwrap();
792 let info = infos
793 .get_mut(&sample_id.block_height())
794 .unwrap_or_else(|| panic!("Unexpected height: {}", sample_id.block_height()));
795
796 info.requests_count += 1;
797
798 if info.simulate_invalid_sampling && info.requests_count == INVALID_SHARE_REQ_NUM {
800 respond_to.send(Err(P2pError::BitswapQueryTimeout)).unwrap();
801 continue;
802 }
803
804 let sample = gen_sample_of_cid(sample_id, info.eds).await;
805 respond_to.send(Ok(sample)).unwrap();
806 }
807
808 cids.sort();
809 cids
810 }
811
812 async fn handle_get_shwap_cid(
814 handle: &mut MockP2pHandle,
815 height: u64,
816 eds: &ExtendedDataSquare,
817 simulate_invalid_sampling: bool,
818 ) -> Vec<Cid> {
819 handle_concurrent_get_shwap_cid(handle, [(height, eds, simulate_invalid_sampling)]).await
820 }
821
822 async fn gen_sample_of_cid(sample_id: SampleId, eds: &ExtendedDataSquare) -> Vec<u8> {
823 let sample = Sample::new(
824 sample_id.row_index(),
825 sample_id.column_index(),
826 AxisType::Row,
827 eds,
828 )
829 .unwrap();
830
831 let mut container = BytesMut::new();
832 sample.encode(&mut container);
833
834 let block = Block {
835 cid: convert_cid(&sample_id.into()).unwrap().to_bytes(),
836 container: container.to_vec(),
837 };
838
839 block.encode_to_vec()
840 }
841}