lumina_node/node/
subscriptions.rs

1//! Types and utilities related to header/blob/share subscriptions
2
3use std::ops::Deref;
4use std::sync::Arc;
5use std::time::Duration;
6
7use celestia_types::blob::BlobsAtHeight;
8use celestia_types::namespace_data::NamespaceData;
9use celestia_types::nmt::Namespace;
10use celestia_types::{Blob, ExtendedHeader, SharesAtHeight};
11use lumina_utils::executor::yield_now;
12use tokio::sync::broadcast::error::RecvError;
13use tokio::sync::{broadcast, mpsc};
14use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
15
16use crate::NodeError;
17use crate::p2p::{P2p, P2pError};
18use crate::store::{Store, StoreError};
19
20const SHWAP_FETCH_TIMEOUT: Duration = Duration::from_secs(5);
21const HEADER_BROADCAST_CHANNEL_CAPACITY: usize = 16;
22
23/// Error thrown while processing the subscription
24#[derive(Debug, thiserror::Error)]
25pub enum SubscriptionError {
26    /// Error retrieving subscription item
27    #[error("Unable to receive subscription item at {height}: {source}")]
28    Node {
29        /// Height of the subscription item
30        height: u64,
31        /// Error that occurred
32        #[source]
33        source: NodeError,
34    },
35    /// Receiver lagged too far behind and the subscription will restart from the current head
36    #[error("Subscription item height already pruned from the store, skipping {0} items")]
37    Lagged(u64),
38}
39
40fn reconstruct_blobs(
41    namespace_data: NamespaceData,
42    header: &ExtendedHeader,
43) -> Result<BlobsAtHeight, P2pError> {
44    let shares = namespace_data
45        .rows()
46        .iter()
47        .flat_map(|row| row.shares.iter());
48    let blobs = Blob::reconstruct_all(shares, header.app_version())?;
49    Ok(BlobsAtHeight {
50        height: header.height(),
51        blobs,
52    })
53}
54/// As it gets notified about new header ranges being inserted, it generates a contiguous
55/// stream of Headers as they are synchronised by the node.
56#[derive(Debug)]
57pub(crate) struct BroadcastingStore<S> {
58    inner: Arc<S>,
59    sender: broadcast::Sender<ExtendedHeader>,
60    last_sent_height: Option<u64>,
61    pending: Vec<Vec<ExtendedHeader>>,
62}
63
64impl<S> BroadcastingStore<S>
65where
66    S: Store,
67{
68    pub fn new(store: Arc<S>) -> Self {
69        let (sender, _) = broadcast::channel(HEADER_BROADCAST_CHANNEL_CAPACITY);
70        BroadcastingStore {
71            inner: store,
72            sender,
73            last_sent_height: None,
74            pending: Default::default(),
75        }
76    }
77
78    pub fn clone_inner_store(&self) -> Arc<S> {
79        self.inner.clone()
80    }
81
82    /// Prepare BroadcastingStore for forwarding headers it received, by setting
83    /// the header height it should start at
84    pub(crate) fn init_broadcast(&mut self, head: ExtendedHeader) {
85        if self.last_sent_height.is_none() {
86            // First initialisation means Syncer has acquired a network head for the first time,
87            // start from there
88            self.last_sent_height = Some(head.height());
89            let _ = self.sender.send(head);
90        } else {
91            // Subsequent initialisations happen when syncer re-connects to the network
92            // this could have caused a gap in sent heights. This will get sorted out on
93            // next [`insert`].
94            self.pending.push(vec![head]);
95        }
96    }
97
98    pub(crate) fn subscribe(&self) -> broadcast::Receiver<ExtendedHeader> {
99        self.sender.subscribe()
100    }
101
102    pub(crate) async fn announce_insert(
103        &mut self,
104        range: Vec<ExtendedHeader>,
105    ) -> Result<(), StoreError> {
106        let last_sent_height = self
107            .last_sent_height
108            .expect("syncer should have initialised the height by now");
109
110        let Some(lowest_range_height) = range.first().map(|h| h.height()) else {
111            // Ignore empty range
112            return Ok(());
113        };
114
115        debug_assert!(
116            range.last().map(|h| h.height()).unwrap() < last_sent_height
117                || lowest_range_height > last_sent_height
118        );
119        if lowest_range_height < last_sent_height {
120            // We know range cannot cross last_sent_height, so either both ends are before or after.
121            // Ignore node syncing historical header ranges.
122            return self.inner.insert(range).await;
123        }
124
125        self.inner.insert(range.clone()).await?;
126
127        if last_sent_height + 1 == lowest_range_height {
128            self.send_range(range).await;
129        } else {
130            self.pending.push(range);
131        }
132
133        let mut i = 0;
134        while i < self.pending.len() {
135            let last_sent_height = self
136                .last_sent_height
137                .expect("last_sent_height should be initialised here");
138            let first_pending_height = self.pending[i]
139                .first()
140                .expect("header range shouldn't be empty")
141                .height();
142
143            if last_sent_height + 1 == first_pending_height {
144                let range = self.pending.swap_remove(i);
145                self.send_range(range).await;
146                i = 0;
147            } else {
148                i += 1;
149            }
150        }
151
152        Ok(())
153    }
154
155    async fn send_range(&mut self, headers: Vec<ExtendedHeader>) {
156        self.last_sent_height = Some(
157            headers
158                .last()
159                .expect("range shouldn't be empty here")
160                .height(),
161        );
162        for header in headers {
163            if self.sender.send(header).is_err() {
164                return; // no receivers - skip sending
165            }
166            // yield to allow receivers to go before the channel fills
167            yield_now().await;
168        }
169    }
170}
171
172impl<S> Deref for BroadcastingStore<S>
173where
174    S: Store,
175{
176    type Target = S;
177
178    fn deref(&self) -> &Self::Target {
179        self.inner.as_ref()
180    }
181}
182
183pub(crate) async fn forward_new_blobs(
184    namespace: Namespace,
185    tx: mpsc::Sender<Result<BlobsAtHeight, SubscriptionError>>,
186    mut header_receiver: broadcast::Receiver<ExtendedHeader>,
187    p2p: Arc<P2p>,
188) {
189    loop {
190        let header = match header_receiver.recv().await {
191            Ok(header) => header,
192            Err(RecvError::Lagged(skipped)) => {
193                let _ = tx.send(Err(SubscriptionError::Lagged(skipped))).await;
194                continue;
195            }
196            Err(RecvError::Closed) => {
197                return;
198            }
199        };
200
201        let blobs_or_error = p2p
202            .get_namespace_data(namespace, header.height(), Some(SHWAP_FETCH_TIMEOUT))
203            .await
204            .and_then(|namespace_data| reconstruct_blobs(namespace_data, &header))
205            .map_err(|e| SubscriptionError::Node {
206                height: header.height(),
207                source: e.into(),
208            });
209
210        if tx.send(blobs_or_error).await.is_err() {
211            return; // receiver dropped
212        }
213    }
214}
215
216pub(crate) async fn forward_new_shares(
217    namespace: Namespace,
218    tx: mpsc::Sender<Result<SharesAtHeight, SubscriptionError>>,
219    mut header_receiver: broadcast::Receiver<ExtendedHeader>,
220    p2p: Arc<P2p>,
221) {
222    loop {
223        let header = match header_receiver.recv().await {
224            Ok(header) => header,
225            Err(RecvError::Lagged(skipped)) => {
226                let _ = tx.send(Err(SubscriptionError::Lagged(skipped))).await;
227                continue;
228            }
229            Err(RecvError::Closed) => {
230                return;
231            }
232        };
233
234        let shares_or_error = match p2p
235            .get_namespace_data(namespace, header.height(), Some(SHWAP_FETCH_TIMEOUT))
236            .await
237        {
238            Ok(namespace_data) => Ok(SharesAtHeight {
239                height: header.height(),
240                shares: namespace_data
241                    .into_inner()
242                    .into_iter()
243                    .flat_map(|row| row.shares.into_iter())
244                    .collect(),
245            }),
246            Err(e) => Err(SubscriptionError::Node {
247                height: header.height(),
248                source: e.into(),
249            }),
250        };
251
252        if tx.send(shares_or_error).await.is_err() {
253            return; // receiver dropped
254        }
255    }
256}
257
258impl From<BroadcastStreamRecvError> for SubscriptionError {
259    fn from(BroadcastStreamRecvError::Lagged(skipped): BroadcastStreamRecvError) -> Self {
260        SubscriptionError::Lagged(skipped)
261    }
262}