lumina_node/
events.rs

1//! Events generated by `Node`
2
3use std::fmt;
4use std::panic::Location;
5use std::time::Duration;
6
7use libp2p::PeerId;
8use lumina_utils::time::SystemTime;
9use serde::Serialize;
10use tokio::sync::broadcast;
11
12const EVENT_CHANNEL_CAPACITY: usize = 1024;
13
14/// An error returned from the [`EventSubscriber::recv`].
15#[derive(Debug, thiserror::Error)]
16pub enum RecvError {
17    /// Node and all its event senders are closed.
18    #[error("Event channel closed")]
19    Closed,
20}
21
22/// An error returned from the [`EventSubscriber::try_recv`].
23#[derive(Debug, thiserror::Error)]
24pub enum TryRecvError {
25    /// The event channel is currently empty.
26    #[error("Event channel empty")]
27    Empty,
28    /// Node and all its event senders are closed.
29    #[error("Event channel closed")]
30    Closed,
31}
32
33/// A channel which users can subscribe for events.
34#[derive(Debug)]
35pub(crate) struct EventChannel {
36    tx: broadcast::Sender<NodeEventInfo>,
37}
38
39/// `EventPublisher` is used to broadcast events generated by [`Node`] to [`EventSubscriber`]s.
40///
41/// [`Node`]: crate::node::Node
42#[derive(Debug, Clone)]
43pub(crate) struct EventPublisher {
44    tx: broadcast::Sender<NodeEventInfo>,
45}
46
47/// `EventSubscriber` can be used by users to receive events from [`Node`].
48///
49/// [`Node`]: crate::node::Node
50#[derive(Debug)]
51pub struct EventSubscriber {
52    rx: broadcast::Receiver<NodeEventInfo>,
53}
54
55impl EventChannel {
56    /// Create a new `EventChannel`.
57    pub(crate) fn new() -> EventChannel {
58        let (tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
59        EventChannel { tx }
60    }
61
62    /// Creates a new [`EventPublisher`].
63    pub(crate) fn publisher(&self) -> EventPublisher {
64        EventPublisher {
65            tx: self.tx.clone(),
66        }
67    }
68
69    /// Creates a new [`EventSubscriber`].
70    pub(crate) fn subscribe(&self) -> EventSubscriber {
71        EventSubscriber {
72            rx: self.tx.subscribe(),
73        }
74    }
75}
76
77impl Default for EventChannel {
78    fn default() -> Self {
79        EventChannel::new()
80    }
81}
82
83impl EventPublisher {
84    pub(crate) fn send(&self, event: NodeEvent) {
85        let time = SystemTime::now();
86        let location: &'static Location<'static> = Location::caller();
87
88        // Error is produced if there aren't any subscribers. Since this is
89        // a valid case, we ignore the error.
90        let _ = self.tx.send(NodeEventInfo {
91            event,
92            time,
93            file_path: location.file(),
94            file_line: location.line(),
95        });
96    }
97}
98
99impl EventSubscriber {
100    /// Receive an event from [`Node`].
101    ///
102    /// # Cancel safety
103    ///
104    /// This method is cancel-safe.
105    ///
106    /// [`Node`]: crate::node::Node
107    pub async fn recv(&mut self) -> Result<NodeEventInfo, RecvError> {
108        loop {
109            match self.rx.recv().await {
110                Ok(val) => return Ok(val),
111                Err(broadcast::error::RecvError::Lagged(_)) => {
112                    // Slow consumer. We will receive a message on the next call.
113                    continue;
114                }
115                Err(broadcast::error::RecvError::Closed) => return Err(RecvError::Closed),
116            }
117        }
118    }
119
120    /// Attempts to receive an already queued event from [`Node`] without awaiting.
121    ///
122    /// If no events are queued, `Err(TryRecvError::Empty)` is returned.
123    ///
124    /// [`Node`]: crate::node::Node
125    pub fn try_recv(&mut self) -> Result<NodeEventInfo, TryRecvError> {
126        loop {
127            match self.rx.try_recv() {
128                Ok(val) => return Ok(val),
129                Err(broadcast::error::TryRecvError::Lagged(_)) => {
130                    // Slow consumer. We will receive a message on the next call.
131                    continue;
132                }
133                Err(broadcast::error::TryRecvError::Empty) => return Err(TryRecvError::Empty),
134                Err(broadcast::error::TryRecvError::Closed) => return Err(TryRecvError::Closed),
135            }
136        }
137    }
138}
139
140/// This struct include the [`NodeEvent`] and some extra information about the event.
141#[derive(Debug, Clone, Serialize)]
142pub struct NodeEventInfo {
143    /// The event
144    pub event: NodeEvent,
145    #[cfg_attr(
146        target_arch = "wasm32",
147        serde(serialize_with = "serialize_system_time")
148    )]
149    /// When the event was generated.
150    pub time: SystemTime,
151    /// Which file generated the event.
152    pub file_path: &'static str,
153    /// Which line in the file generated the event.
154    pub file_line: u32,
155}
156
157/// The events that [`Node`] can generate.
158///
159/// [`Node`]: crate::node::Node
160#[derive(Debug, Clone, Serialize)]
161#[non_exhaustive]
162#[serde(tag = "type")]
163#[serde(rename_all = "snake_case")]
164pub enum NodeEvent {
165    /// Node is connecting to bootnodes
166    ConnectingToBootnodes,
167
168    /// Peer just connected
169    PeerConnected {
170        #[serde(serialize_with = "serialize_as_string")]
171        /// The ID of the peer.
172        id: PeerId,
173        /// Whether peer was in the trusted list or not.
174        trusted: bool,
175    },
176
177    /// Peer just disconnected
178    PeerDisconnected {
179        #[serde(serialize_with = "serialize_as_string")]
180        /// The ID of the peer.
181        id: PeerId,
182        /// Whether peer was in the trusted list or not.
183        trusted: bool,
184    },
185
186    /// Sampling just started.
187    SamplingStarted {
188        /// The block height that will be sampled.
189        height: u64,
190        /// The square width of the block.
191        square_width: u16,
192        /// The coordinates of the shares that will be sampled.
193        shares: Vec<(u16, u16)>,
194    },
195
196    /// A share was sampled.
197    ShareSamplingResult {
198        /// The block height of the share.
199        height: u64,
200        /// The square width of the block.
201        square_width: u16,
202        /// The row of the share.
203        row: u16,
204        /// The column of the share.
205        column: u16,
206        /// The result of the sampling of the share.
207        accepted: bool,
208    },
209
210    /// Sampling just finished.
211    SamplingFinished {
212        /// The block height that was sampled.
213        height: u64,
214        /// The overall result of the sampling.
215        accepted: bool,
216        /// How much time sampling took.
217        took: Duration,
218    },
219
220    /// Data sampling fatal error.
221    FatalDaserError {
222        /// A human readable error.
223        error: String,
224    },
225
226    /// A new header was added from HeaderSub.
227    AddedHeaderFromHeaderSub {
228        /// The height of the header.
229        height: u64,
230    },
231
232    /// Fetching header of network head just started.
233    FetchingHeadHeaderStarted,
234
235    /// Fetching header of network head just finished.
236    FetchingHeadHeaderFinished {
237        /// The height of the network head.
238        height: u64,
239        /// How much time fetching took.
240        took: Duration,
241    },
242
243    /// Fetching headers of a specific block range just started.
244    FetchingHeadersStarted {
245        /// Start of the range.
246        from_height: u64,
247        /// End of the range (included).
248        to_height: u64,
249    },
250
251    /// Fetching headers of a specific block range just finished.
252    FetchingHeadersFinished {
253        /// Start of the range.
254        from_height: u64,
255        /// End of the range (included).
256        to_height: u64,
257        /// How much time fetching took.
258        took: Duration,
259    },
260
261    /// Fetching headers of a specific block range just failed.
262    FetchingHeadersFailed {
263        /// Start of the range.
264        from_height: u64,
265        /// End of the range (included).
266        to_height: u64,
267        /// A human readable error.
268        error: String,
269        /// How much time fetching took.
270        took: Duration,
271    },
272
273    /// Header syncing fatal error.
274    FatalSyncerError {
275        /// A human readable error.
276        error: String,
277    },
278
279    /// Pruned headers up to and including specified height.
280    PrunedHeaders {
281        /// Last header height that was pruned
282        to_height: u64,
283    },
284
285    /// Pruning fatal error.
286    FatalPrunerError {
287        /// A human readable error.
288        error: String,
289    },
290
291    /// Network was compromised.
292    ///
293    /// This happens when a valid bad encoding fraud proof is received.
294    /// Ideally it would never happen, but protection needs to exist.
295    /// In case of compromised network, syncing and data sampling will
296    /// stop immediately.
297    NetworkCompromised,
298
299    /// Node stopped.
300    NodeStopped,
301}
302
303impl NodeEvent {
304    /// Returns `true` if the event indicates an error.
305    pub fn is_error(&self) -> bool {
306        match self {
307            NodeEvent::FatalDaserError { .. }
308            | NodeEvent::FatalSyncerError { .. }
309            | NodeEvent::FatalPrunerError { .. }
310            | NodeEvent::FetchingHeadersFailed { .. }
311            | NodeEvent::NetworkCompromised => true,
312            NodeEvent::ConnectingToBootnodes
313            | NodeEvent::PeerConnected { .. }
314            | NodeEvent::PeerDisconnected { .. }
315            | NodeEvent::SamplingStarted { .. }
316            | NodeEvent::ShareSamplingResult { .. }
317            | NodeEvent::SamplingFinished { .. }
318            | NodeEvent::AddedHeaderFromHeaderSub { .. }
319            | NodeEvent::FetchingHeadHeaderStarted
320            | NodeEvent::FetchingHeadHeaderFinished { .. }
321            | NodeEvent::FetchingHeadersStarted { .. }
322            | NodeEvent::FetchingHeadersFinished { .. }
323            | NodeEvent::PrunedHeaders { .. }
324            | NodeEvent::NodeStopped => false,
325        }
326    }
327}
328
329impl fmt::Display for NodeEvent {
330    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
331        match self {
332            NodeEvent::ConnectingToBootnodes => {
333                write!(f, "Connecting to bootnodes")
334            }
335            NodeEvent::PeerConnected { id, trusted } => {
336                if *trusted {
337                    write!(f, "Trusted peer connected: {id}")
338                } else {
339                    write!(f, "Peer connected: {id}")
340                }
341            }
342            NodeEvent::PeerDisconnected { id, trusted } => {
343                if *trusted {
344                    write!(f, "Trusted peer disconnected: {id}")
345                } else {
346                    write!(f, "Peer disconnected: {id}")
347                }
348            }
349            NodeEvent::SamplingStarted {
350                height,
351                square_width,
352                shares,
353            } => {
354                write!(f, "Sampling of block {height} started. Square: {square_width}x{square_width}, Shares: {shares:?}")
355            }
356            NodeEvent::ShareSamplingResult {
357                height,
358                row,
359                column,
360                accepted,
361                ..
362            } => {
363                let acc = if *accepted { "accepted" } else { "rejected" };
364                write!(
365                    f,
366                    "Sampling for share [{row}, {column}] of block {height} was {acc}"
367                )
368            }
369            NodeEvent::SamplingFinished { height, took, .. } => {
370                write!(f, "Sampling of block {height} finished. Took: {took:?}")
371            }
372            NodeEvent::FatalDaserError { error } => {
373                write!(f, "Daser stopped because of a fatal error: {error}")
374            }
375            NodeEvent::AddedHeaderFromHeaderSub { height } => {
376                write!(f, "Added header {height} from header-sub")
377            }
378            NodeEvent::FetchingHeadHeaderStarted => {
379                write!(f, "Fetching header of network head block started")
380            }
381            NodeEvent::FetchingHeadHeaderFinished { height, took } => {
382                write!(f, "Fetching header of network head block finished. Height: {height}, Took: {took:?}")
383            }
384            NodeEvent::FetchingHeadersStarted {
385                from_height,
386                to_height,
387            } => {
388                if from_height == to_height {
389                    write!(f, "Fetching header of block {from_height} started")
390                } else {
391                    write!(
392                        f,
393                        "Fetching headers of blocks {from_height}-{to_height} started"
394                    )
395                }
396            }
397            NodeEvent::FetchingHeadersFinished {
398                from_height,
399                to_height,
400                took,
401            } => {
402                if from_height == to_height {
403                    write!(
404                        f,
405                        "Fetching header of block {from_height} finished. Took: {took:?}"
406                    )
407                } else {
408                    write!(f, "Fetching headers of blocks {from_height}-{to_height} finished. Took: {took:?}")
409                }
410            }
411            NodeEvent::FetchingHeadersFailed {
412                from_height,
413                to_height,
414                error,
415                took,
416            } => {
417                if from_height == to_height {
418                    write!(
419                        f,
420                        "Fetching header of block {from_height} failed. Took: {took:?}, Error: {error}"
421                    )
422                } else {
423                    write!(f, "Fetching headers of blocks {from_height}-{to_height} failed. Took: {took:?}, Error: {error}")
424                }
425            }
426            NodeEvent::FatalSyncerError { error } => {
427                write!(f, "Syncer stopped because of a fatal error: {error}")
428            }
429            Self::PrunedHeaders { to_height } => {
430                write!(f, "Pruned headers up to and including {to_height}")
431            }
432            NodeEvent::FatalPrunerError { error } => {
433                write!(f, "Pruner stopped because of a fatal error: {error}")
434            }
435            NodeEvent::NetworkCompromised => {
436                write!(f, "The network is compromised and should not be trusted. ")?;
437                write!(f, "Node stopped synchronizing and sampling, but you can still make some queries to the network.")
438            }
439            NodeEvent::NodeStopped => {
440                write!(f, "Node stopped")
441            }
442        }
443    }
444}
445
446fn serialize_as_string<T, S>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
447where
448    T: ToString,
449    S: serde::ser::Serializer,
450{
451    value.to_string().serialize(serializer)
452}
453
454#[cfg(target_arch = "wasm32")]
455fn serialize_system_time<S>(value: &SystemTime, serializer: S) -> Result<S::Ok, S::Error>
456where
457    S: serde::ser::Serializer,
458{
459    // Javascript expresses time as f64 and in milliseconds.
460    let js_time = value
461        .duration_since(SystemTime::UNIX_EPOCH)
462        .expect("SystemTime is before 1970")
463        .as_secs_f64()
464        * 1000.0;
465    js_time.serialize(serializer)
466}