lumina_node/node/
subscriptions.rs1use std::ops::Deref;
4use std::sync::Arc;
5use std::time::Duration;
6
7use celestia_types::blob::BlobsAtHeight;
8use celestia_types::namespace_data::NamespaceData;
9use celestia_types::nmt::Namespace;
10use celestia_types::{Blob, ExtendedHeader, SharesAtHeight};
11use lumina_utils::executor::yield_now;
12use tokio::sync::broadcast::error::RecvError;
13use tokio::sync::{broadcast, mpsc};
14use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
15
16use crate::NodeError;
17use crate::p2p::{P2p, P2pError};
18use crate::store::{Store, StoreError};
19
20const SHWAP_FETCH_TIMEOUT: Duration = Duration::from_secs(5);
21const HEADER_BROADCAST_CHANNEL_CAPACITY: usize = 16;
22
23#[derive(Debug, thiserror::Error)]
25pub enum SubscriptionError {
26 #[error("Unable to receive subscription item at {height}: {source}")]
28 Node {
29 height: u64,
31 #[source]
33 source: NodeError,
34 },
35 #[error("Subscription item height already pruned from the store, skipping {0} items")]
37 Lagged(u64),
38}
39
40fn reconstruct_blobs(
41 namespace_data: NamespaceData,
42 header: &ExtendedHeader,
43) -> Result<BlobsAtHeight, P2pError> {
44 let shares = namespace_data
45 .rows()
46 .iter()
47 .flat_map(|row| row.shares.iter());
48 let blobs = Blob::reconstruct_all(shares, header.app_version())?;
49 Ok(BlobsAtHeight {
50 height: header.height(),
51 blobs,
52 })
53}
54#[derive(Debug)]
57pub(crate) struct BroadcastingStore<S> {
58 inner: Arc<S>,
59 sender: broadcast::Sender<ExtendedHeader>,
60 last_sent_height: Option<u64>,
61 pending: Vec<Vec<ExtendedHeader>>,
62}
63
64impl<S> BroadcastingStore<S>
65where
66 S: Store,
67{
68 pub fn new(store: Arc<S>) -> Self {
69 let (sender, _) = broadcast::channel(HEADER_BROADCAST_CHANNEL_CAPACITY);
70 BroadcastingStore {
71 inner: store,
72 sender,
73 last_sent_height: None,
74 pending: Default::default(),
75 }
76 }
77
78 pub fn clone_inner_store(&self) -> Arc<S> {
79 self.inner.clone()
80 }
81
82 pub(crate) fn init_broadcast(&mut self, head: ExtendedHeader) {
85 if self.last_sent_height.is_none() {
86 self.last_sent_height = Some(head.height());
89 let _ = self.sender.send(head);
90 } else {
91 self.pending.push(vec![head]);
95 }
96 }
97
98 pub(crate) fn subscribe(&self) -> broadcast::Receiver<ExtendedHeader> {
99 self.sender.subscribe()
100 }
101
102 pub(crate) async fn announce_insert(
103 &mut self,
104 range: Vec<ExtendedHeader>,
105 ) -> Result<(), StoreError> {
106 let last_sent_height = self
107 .last_sent_height
108 .expect("syncer should have initialised the height by now");
109
110 let Some(lowest_range_height) = range.first().map(|h| h.height()) else {
111 return Ok(());
113 };
114
115 debug_assert!(
116 range.last().map(|h| h.height()).unwrap() < last_sent_height
117 || lowest_range_height > last_sent_height
118 );
119 if lowest_range_height < last_sent_height {
120 return self.inner.insert(range).await;
123 }
124
125 self.inner.insert(range.clone()).await?;
126
127 if last_sent_height + 1 == lowest_range_height {
128 self.send_range(range).await;
129 } else {
130 self.pending.push(range);
131 }
132
133 let mut i = 0;
134 while i < self.pending.len() {
135 let last_sent_height = self
136 .last_sent_height
137 .expect("last_sent_height should be initialised here");
138 let first_pending_height = self.pending[i]
139 .first()
140 .expect("header range shouldn't be empty")
141 .height();
142
143 if last_sent_height + 1 == first_pending_height {
144 let range = self.pending.swap_remove(i);
145 self.send_range(range).await;
146 i = 0;
147 } else {
148 i += 1;
149 }
150 }
151
152 Ok(())
153 }
154
155 async fn send_range(&mut self, headers: Vec<ExtendedHeader>) {
156 self.last_sent_height = Some(
157 headers
158 .last()
159 .expect("range shouldn't be empty here")
160 .height(),
161 );
162 for header in headers {
163 if self.sender.send(header).is_err() {
164 return; }
166 yield_now().await;
168 }
169 }
170}
171
172impl<S> Deref for BroadcastingStore<S>
173where
174 S: Store,
175{
176 type Target = S;
177
178 fn deref(&self) -> &Self::Target {
179 self.inner.as_ref()
180 }
181}
182
183pub(crate) async fn forward_new_blobs(
184 namespace: Namespace,
185 tx: mpsc::Sender<Result<BlobsAtHeight, SubscriptionError>>,
186 mut header_receiver: broadcast::Receiver<ExtendedHeader>,
187 p2p: Arc<P2p>,
188) {
189 loop {
190 let header = match header_receiver.recv().await {
191 Ok(header) => header,
192 Err(RecvError::Lagged(skipped)) => {
193 let _ = tx.send(Err(SubscriptionError::Lagged(skipped))).await;
194 continue;
195 }
196 Err(RecvError::Closed) => {
197 return;
198 }
199 };
200
201 let blobs_or_error = p2p
202 .get_namespace_data(namespace, header.height(), Some(SHWAP_FETCH_TIMEOUT))
203 .await
204 .and_then(|namespace_data| reconstruct_blobs(namespace_data, &header))
205 .map_err(|e| SubscriptionError::Node {
206 height: header.height(),
207 source: e.into(),
208 });
209
210 if tx.send(blobs_or_error).await.is_err() {
211 return; }
213 }
214}
215
216pub(crate) async fn forward_new_shares(
217 namespace: Namespace,
218 tx: mpsc::Sender<Result<SharesAtHeight, SubscriptionError>>,
219 mut header_receiver: broadcast::Receiver<ExtendedHeader>,
220 p2p: Arc<P2p>,
221) {
222 loop {
223 let header = match header_receiver.recv().await {
224 Ok(header) => header,
225 Err(RecvError::Lagged(skipped)) => {
226 let _ = tx.send(Err(SubscriptionError::Lagged(skipped))).await;
227 continue;
228 }
229 Err(RecvError::Closed) => {
230 return;
231 }
232 };
233
234 let shares_or_error = match p2p
235 .get_namespace_data(namespace, header.height(), Some(SHWAP_FETCH_TIMEOUT))
236 .await
237 {
238 Ok(namespace_data) => Ok(SharesAtHeight {
239 height: header.height(),
240 shares: namespace_data
241 .into_inner()
242 .into_iter()
243 .flat_map(|row| row.shares.into_iter())
244 .collect(),
245 }),
246 Err(e) => Err(SubscriptionError::Node {
247 height: header.height(),
248 source: e.into(),
249 }),
250 };
251
252 if tx.send(shares_or_error).await.is_err() {
253 return; }
255 }
256}
257
258impl From<BroadcastStreamRecvError> for SubscriptionError {
259 fn from(BroadcastStreamRecvError::Lagged(skipped): BroadcastStreamRecvError) -> Self {
260 SubscriptionError::Lagged(skipped)
261 }
262}