lumina_node/
events.rs

1//! Events generated by `Node`
2
3use std::fmt;
4use std::panic::Location;
5use std::time::Duration;
6
7use libp2p::PeerId;
8use lumina_utils::time::SystemTime;
9use serde::Serialize;
10use tokio::sync::broadcast;
11
12const EVENT_CHANNEL_CAPACITY: usize = 1024;
13
14/// An error returned from the [`EventSubscriber::recv`].
15#[derive(Debug, thiserror::Error)]
16pub enum RecvError {
17    /// Node and all its event senders are closed.
18    #[error("Event channel closed")]
19    Closed,
20}
21
22/// An error returned from the [`EventSubscriber::try_recv`].
23#[derive(Debug, thiserror::Error)]
24pub enum TryRecvError {
25    /// The event channel is currently empty.
26    #[error("Event channel empty")]
27    Empty,
28    /// Node and all its event senders are closed.
29    #[error("Event channel closed")]
30    Closed,
31}
32
33/// A channel which users can subscribe for events.
34#[derive(Debug)]
35pub(crate) struct EventChannel {
36    tx: broadcast::Sender<NodeEventInfo>,
37}
38
39/// `EventPublisher` is used to broadcast events generated by [`Node`] to [`EventSubscriber`]s.
40///
41/// [`Node`]: crate::node::Node
42#[derive(Debug, Clone)]
43pub(crate) struct EventPublisher {
44    tx: broadcast::Sender<NodeEventInfo>,
45}
46
47/// `EventSubscriber` can be used by users to receive events from [`Node`].
48///
49/// [`Node`]: crate::node::Node
50#[derive(Debug)]
51pub struct EventSubscriber {
52    rx: broadcast::Receiver<NodeEventInfo>,
53}
54
55impl EventChannel {
56    /// Create a new `EventChannel`.
57    pub(crate) fn new() -> EventChannel {
58        let (tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
59        EventChannel { tx }
60    }
61
62    /// Creates a new [`EventPublisher`].
63    pub(crate) fn publisher(&self) -> EventPublisher {
64        EventPublisher {
65            tx: self.tx.clone(),
66        }
67    }
68
69    /// Creates a new [`EventSubscriber`].
70    pub(crate) fn subscribe(&self) -> EventSubscriber {
71        EventSubscriber {
72            rx: self.tx.subscribe(),
73        }
74    }
75}
76
77impl Default for EventChannel {
78    fn default() -> Self {
79        EventChannel::new()
80    }
81}
82
83impl EventPublisher {
84    pub(crate) fn send(&self, event: NodeEvent) {
85        let time = SystemTime::now();
86        let location: &'static Location<'static> = Location::caller();
87
88        // Error is produced if there aren't any subscribers. Since this is
89        // a valid case, we ignore the error.
90        let _ = self.tx.send(NodeEventInfo {
91            event,
92            time,
93            file_path: location.file(),
94            file_line: location.line(),
95        });
96    }
97}
98
99impl EventSubscriber {
100    /// Receive an event from [`Node`].
101    ///
102    /// # Cancel safety
103    ///
104    /// This method is cancel-safe.
105    ///
106    /// [`Node`]: crate::node::Node
107    pub async fn recv(&mut self) -> Result<NodeEventInfo, RecvError> {
108        loop {
109            match self.rx.recv().await {
110                Ok(val) => return Ok(val),
111                Err(broadcast::error::RecvError::Lagged(_)) => {
112                    // Slow consumer. We will receive a message on the next call.
113                    continue;
114                }
115                Err(broadcast::error::RecvError::Closed) => return Err(RecvError::Closed),
116            }
117        }
118    }
119
120    /// Attempts to receive an already queued event from [`Node`] without awaiting.
121    ///
122    /// If no events are queued, `Err(TryRecvError::Empty)` is returned.
123    ///
124    /// [`Node`]: crate::node::Node
125    pub fn try_recv(&mut self) -> Result<NodeEventInfo, TryRecvError> {
126        loop {
127            match self.rx.try_recv() {
128                Ok(val) => return Ok(val),
129                Err(broadcast::error::TryRecvError::Lagged(_)) => {
130                    // Slow consumer. We will receive a message on the next call.
131                    continue;
132                }
133                Err(broadcast::error::TryRecvError::Empty) => return Err(TryRecvError::Empty),
134                Err(broadcast::error::TryRecvError::Closed) => return Err(TryRecvError::Closed),
135            }
136        }
137    }
138}
139
140/// This struct include the [`NodeEvent`] and some extra information about the event.
141#[derive(Debug, Clone, Serialize)]
142pub struct NodeEventInfo {
143    /// The event
144    pub event: NodeEvent,
145    #[cfg_attr(
146        target_arch = "wasm32",
147        serde(serialize_with = "serialize_system_time")
148    )]
149    /// When the event was generated.
150    pub time: SystemTime,
151    /// Which file generated the event.
152    pub file_path: &'static str,
153    /// Which line in the file generated the event.
154    pub file_line: u32,
155}
156
157/// The events that [`Node`] can generate.
158///
159/// [`Node`]: crate::node::Node
160#[derive(Debug, Clone, Serialize)]
161#[non_exhaustive]
162#[serde(tag = "type")]
163#[serde(rename_all = "snake_case")]
164pub enum NodeEvent {
165    /// Node is connecting to bootnodes
166    ConnectingToBootnodes,
167
168    /// Peer just connected
169    PeerConnected {
170        #[serde(serialize_with = "serialize_as_string")]
171        /// The ID of the peer.
172        id: PeerId,
173        /// Whether peer was in the trusted list or not.
174        trusted: bool,
175    },
176
177    /// Peer just disconnected
178    PeerDisconnected {
179        #[serde(serialize_with = "serialize_as_string")]
180        /// The ID of the peer.
181        id: PeerId,
182        /// Whether peer was in the trusted list or not.
183        trusted: bool,
184    },
185
186    /// Sampling just started.
187    SamplingStarted {
188        /// The block height that will be sampled.
189        height: u64,
190        /// The square width of the block.
191        square_width: u16,
192        /// The coordinates of the shares that will be sampled.
193        shares: Vec<(u16, u16)>,
194    },
195
196    /// Share sampling result.
197    ShareSamplingResult {
198        /// The block height of the share.
199        height: u64,
200        /// The square width of the block.
201        square_width: u16,
202        /// The row of the share.
203        row: u16,
204        /// The column of the share.
205        column: u16,
206        /// Share sampling timed out.
207        timed_out: bool,
208    },
209
210    /// Sampling result.
211    SamplingResult {
212        /// The block height that was sampled.
213        height: u64,
214        /// Sampling timed out.
215        timed_out: bool,
216        /// How much time sampling took.
217        took: Duration,
218    },
219
220    /// Data sampling fatal error.
221    FatalDaserError {
222        /// A human readable error.
223        error: String,
224    },
225
226    /// A new header was added from HeaderSub.
227    AddedHeaderFromHeaderSub {
228        /// The height of the header.
229        height: u64,
230    },
231
232    /// Fetching header of network head just started.
233    FetchingHeadHeaderStarted,
234
235    /// Fetching header of network head just finished.
236    FetchingHeadHeaderFinished {
237        /// The height of the network head.
238        height: u64,
239        /// How much time fetching took.
240        took: Duration,
241    },
242
243    /// Fetching headers of a specific block range just started.
244    FetchingHeadersStarted {
245        /// Start of the range.
246        from_height: u64,
247        /// End of the range (inclusive).
248        to_height: u64,
249    },
250
251    /// Fetching headers of a specific block range just finished.
252    FetchingHeadersFinished {
253        /// Start of the range.
254        from_height: u64,
255        /// End of the range (inclusive).
256        to_height: u64,
257        /// How much time fetching took.
258        took: Duration,
259    },
260
261    /// Fetching headers of a specific block range just failed.
262    FetchingHeadersFailed {
263        /// Start of the range.
264        from_height: u64,
265        /// End of the range (inclusive).
266        to_height: u64,
267        /// A human readable error.
268        error: String,
269        /// How much time fetching took.
270        took: Duration,
271    },
272
273    /// Header syncing fatal error.
274    FatalSyncerError {
275        /// A human readable error.
276        error: String,
277    },
278
279    /// Range of headers that were pruned.
280    PrunedHeaders {
281        /// Start of the range.
282        from_height: u64,
283        /// End of the range (inclusive).
284        to_height: u64,
285    },
286
287    /// Pruning fatal error.
288    FatalPrunerError {
289        /// A human readable error.
290        error: String,
291    },
292
293    /// Network was compromised.
294    ///
295    /// This happens when a valid bad encoding fraud proof is received.
296    /// Ideally it would never happen, but protection needs to exist.
297    /// In case of compromised network, syncing and data sampling will
298    /// stop immediately.
299    NetworkCompromised,
300
301    /// Node stopped.
302    NodeStopped,
303}
304
305impl NodeEvent {
306    /// Returns `true` if the event indicates an error.
307    pub fn is_error(&self) -> bool {
308        match self {
309            NodeEvent::FatalDaserError { .. }
310            | NodeEvent::FatalSyncerError { .. }
311            | NodeEvent::FatalPrunerError { .. }
312            | NodeEvent::FetchingHeadersFailed { .. }
313            | NodeEvent::NetworkCompromised => true,
314            NodeEvent::ConnectingToBootnodes
315            | NodeEvent::PeerConnected { .. }
316            | NodeEvent::PeerDisconnected { .. }
317            | NodeEvent::SamplingStarted { .. }
318            | NodeEvent::ShareSamplingResult { .. }
319            | NodeEvent::SamplingResult { .. }
320            | NodeEvent::AddedHeaderFromHeaderSub { .. }
321            | NodeEvent::FetchingHeadHeaderStarted
322            | NodeEvent::FetchingHeadHeaderFinished { .. }
323            | NodeEvent::FetchingHeadersStarted { .. }
324            | NodeEvent::FetchingHeadersFinished { .. }
325            | NodeEvent::PrunedHeaders { .. }
326            | NodeEvent::NodeStopped => false,
327        }
328    }
329}
330
331impl fmt::Display for NodeEvent {
332    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
333        match self {
334            NodeEvent::ConnectingToBootnodes => {
335                write!(f, "Connecting to bootnodes")
336            }
337            NodeEvent::PeerConnected { id, trusted } => {
338                if *trusted {
339                    write!(f, "Trusted peer connected: {id}")
340                } else {
341                    write!(f, "Peer connected: {id}")
342                }
343            }
344            NodeEvent::PeerDisconnected { id, trusted } => {
345                if *trusted {
346                    write!(f, "Trusted peer disconnected: {id}")
347                } else {
348                    write!(f, "Peer disconnected: {id}")
349                }
350            }
351            NodeEvent::SamplingStarted {
352                height,
353                square_width,
354                shares,
355            } => {
356                write!(f, "Sampling of block {height} started. Square: {square_width}x{square_width}, Shares: {shares:?}")
357            }
358            NodeEvent::ShareSamplingResult {
359                height,
360                row,
361                column,
362                timed_out,
363                ..
364            } => {
365                let s = if *timed_out { "timed out" } else { "finished" };
366                write!(
367                    f,
368                    "Sampling for share [{row}, {column}] of block {height} {s}"
369                )
370            }
371            NodeEvent::SamplingResult {
372                height,
373                timed_out,
374                took,
375            } => {
376                let s = if *timed_out { "timed out" } else { "finished" };
377                write!(f, "Sampling of block {height} {s}. Took: {took:?}")
378            }
379            NodeEvent::FatalDaserError { error } => {
380                write!(f, "Daser stopped because of a fatal error: {error}")
381            }
382            NodeEvent::AddedHeaderFromHeaderSub { height } => {
383                write!(f, "Added header {height} from header-sub")
384            }
385            NodeEvent::FetchingHeadHeaderStarted => {
386                write!(f, "Fetching header of network head block started")
387            }
388            NodeEvent::FetchingHeadHeaderFinished { height, took } => {
389                write!(f, "Fetching header of network head block finished. Height: {height}, Took: {took:?}")
390            }
391            NodeEvent::FetchingHeadersStarted {
392                from_height,
393                to_height,
394            } => {
395                if from_height == to_height {
396                    write!(f, "Fetching header of block {from_height} started")
397                } else {
398                    write!(
399                        f,
400                        "Fetching headers of blocks {from_height}-{to_height} started"
401                    )
402                }
403            }
404            NodeEvent::FetchingHeadersFinished {
405                from_height,
406                to_height,
407                took,
408            } => {
409                if from_height == to_height {
410                    write!(
411                        f,
412                        "Fetching header of block {from_height} finished. Took: {took:?}"
413                    )
414                } else {
415                    write!(f, "Fetching headers of blocks {from_height}-{to_height} finished. Took: {took:?}")
416                }
417            }
418            NodeEvent::FetchingHeadersFailed {
419                from_height,
420                to_height,
421                error,
422                took,
423            } => {
424                if from_height == to_height {
425                    write!(
426                        f,
427                        "Fetching header of block {from_height} failed. Took: {took:?}, Error: {error}"
428                    )
429                } else {
430                    write!(f, "Fetching headers of blocks {from_height}-{to_height} failed. Took: {took:?}, Error: {error}")
431                }
432            }
433            NodeEvent::FatalSyncerError { error } => {
434                write!(f, "Syncer stopped because of a fatal error: {error}")
435            }
436            Self::PrunedHeaders {
437                from_height,
438                to_height,
439            } => {
440                if from_height == to_height {
441                    write!(f, "Header of block {from_height} was pruned")
442                } else {
443                    write!(f, "Headers of blocks {from_height}-{to_height} were pruned")
444                }
445            }
446            NodeEvent::FatalPrunerError { error } => {
447                write!(f, "Pruner stopped because of a fatal error: {error}")
448            }
449            NodeEvent::NetworkCompromised => {
450                write!(f, "The network is compromised and should not be trusted. ")?;
451                write!(f, "Node stopped synchronizing and sampling, but you can still make some queries to the network.")
452            }
453            NodeEvent::NodeStopped => {
454                write!(f, "Node stopped")
455            }
456        }
457    }
458}
459
460fn serialize_as_string<T, S>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
461where
462    T: ToString,
463    S: serde::ser::Serializer,
464{
465    value.to_string().serialize(serializer)
466}
467
468#[cfg(target_arch = "wasm32")]
469fn serialize_system_time<S>(value: &SystemTime, serializer: S) -> Result<S::Ok, S::Error>
470where
471    S: serde::ser::Serializer,
472{
473    // Javascript expresses time as f64 and in milliseconds.
474    let js_time = value
475        .duration_since(SystemTime::UNIX_EPOCH)
476        .expect("SystemTime is before 1970")
477        .as_secs_f64()
478        * 1000.0;
479    js_time.serialize(serializer)
480}