1use std::fmt;
4use std::panic::Location;
5use std::time::Duration;
6
7use libp2p::PeerId;
8use lumina_utils::time::SystemTime;
9use serde::Serialize;
10use tokio::sync::broadcast;
11
12const EVENT_CHANNEL_CAPACITY: usize = 1024;
13
14#[derive(Debug, thiserror::Error)]
16pub enum RecvError {
17 #[error("Event channel closed")]
19 Closed,
20}
21
22#[derive(Debug, thiserror::Error)]
24pub enum TryRecvError {
25 #[error("Event channel empty")]
27 Empty,
28 #[error("Event channel closed")]
30 Closed,
31}
32
33#[derive(Debug)]
35pub(crate) struct EventChannel {
36 tx: broadcast::Sender<NodeEventInfo>,
37}
38
39#[derive(Debug, Clone)]
43pub(crate) struct EventPublisher {
44 tx: broadcast::Sender<NodeEventInfo>,
45}
46
47#[derive(Debug)]
51pub struct EventSubscriber {
52 rx: broadcast::Receiver<NodeEventInfo>,
53}
54
55impl EventChannel {
56 pub(crate) fn new() -> EventChannel {
58 let (tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
59 EventChannel { tx }
60 }
61
62 pub(crate) fn publisher(&self) -> EventPublisher {
64 EventPublisher {
65 tx: self.tx.clone(),
66 }
67 }
68
69 pub(crate) fn subscribe(&self) -> EventSubscriber {
71 EventSubscriber {
72 rx: self.tx.subscribe(),
73 }
74 }
75}
76
77impl Default for EventChannel {
78 fn default() -> Self {
79 EventChannel::new()
80 }
81}
82
83impl EventPublisher {
84 pub(crate) fn send(&self, event: NodeEvent) {
85 let time = SystemTime::now();
86 let location: &'static Location<'static> = Location::caller();
87
88 let _ = self.tx.send(NodeEventInfo {
91 event,
92 time,
93 file_path: location.file(),
94 file_line: location.line(),
95 });
96 }
97}
98
99impl EventSubscriber {
100 pub async fn recv(&mut self) -> Result<NodeEventInfo, RecvError> {
108 loop {
109 match self.rx.recv().await {
110 Ok(val) => return Ok(val),
111 Err(broadcast::error::RecvError::Lagged(_)) => {
112 continue;
114 }
115 Err(broadcast::error::RecvError::Closed) => return Err(RecvError::Closed),
116 }
117 }
118 }
119
120 pub fn try_recv(&mut self) -> Result<NodeEventInfo, TryRecvError> {
126 loop {
127 match self.rx.try_recv() {
128 Ok(val) => return Ok(val),
129 Err(broadcast::error::TryRecvError::Lagged(_)) => {
130 continue;
132 }
133 Err(broadcast::error::TryRecvError::Empty) => return Err(TryRecvError::Empty),
134 Err(broadcast::error::TryRecvError::Closed) => return Err(TryRecvError::Closed),
135 }
136 }
137 }
138}
139
140#[derive(Debug, Clone, Serialize)]
142pub struct NodeEventInfo {
143 pub event: NodeEvent,
145 #[cfg_attr(
146 target_arch = "wasm32",
147 serde(serialize_with = "serialize_system_time")
148 )]
149 pub time: SystemTime,
151 pub file_path: &'static str,
153 pub file_line: u32,
155}
156
157#[derive(Debug, Clone, Serialize)]
161#[non_exhaustive]
162#[serde(tag = "type")]
163#[serde(rename_all = "snake_case")]
164pub enum NodeEvent {
165 ConnectingToBootnodes,
167
168 PeerConnected {
170 #[serde(serialize_with = "serialize_as_string")]
171 id: PeerId,
173 trusted: bool,
175 },
176
177 PeerDisconnected {
179 #[serde(serialize_with = "serialize_as_string")]
180 id: PeerId,
182 trusted: bool,
184 },
185
186 SamplingStarted {
188 height: u64,
190 square_width: u16,
192 shares: Vec<(u16, u16)>,
194 },
195
196 ShareSamplingResult {
198 height: u64,
200 square_width: u16,
202 row: u16,
204 column: u16,
206 accepted: bool,
208 },
209
210 SamplingFinished {
212 height: u64,
214 accepted: bool,
216 took: Duration,
218 },
219
220 FatalDaserError {
222 error: String,
224 },
225
226 AddedHeaderFromHeaderSub {
228 height: u64,
230 },
231
232 FetchingHeadHeaderStarted,
234
235 FetchingHeadHeaderFinished {
237 height: u64,
239 took: Duration,
241 },
242
243 FetchingHeadersStarted {
245 from_height: u64,
247 to_height: u64,
249 },
250
251 FetchingHeadersFinished {
253 from_height: u64,
255 to_height: u64,
257 took: Duration,
259 },
260
261 FetchingHeadersFailed {
263 from_height: u64,
265 to_height: u64,
267 error: String,
269 took: Duration,
271 },
272
273 FatalSyncerError {
275 error: String,
277 },
278
279 PrunedHeaders {
281 to_height: u64,
283 },
284
285 FatalPrunerError {
287 error: String,
289 },
290
291 NetworkCompromised,
298
299 NodeStopped,
301}
302
303impl NodeEvent {
304 pub fn is_error(&self) -> bool {
306 match self {
307 NodeEvent::FatalDaserError { .. }
308 | NodeEvent::FatalSyncerError { .. }
309 | NodeEvent::FatalPrunerError { .. }
310 | NodeEvent::FetchingHeadersFailed { .. }
311 | NodeEvent::NetworkCompromised => true,
312 NodeEvent::ConnectingToBootnodes
313 | NodeEvent::PeerConnected { .. }
314 | NodeEvent::PeerDisconnected { .. }
315 | NodeEvent::SamplingStarted { .. }
316 | NodeEvent::ShareSamplingResult { .. }
317 | NodeEvent::SamplingFinished { .. }
318 | NodeEvent::AddedHeaderFromHeaderSub { .. }
319 | NodeEvent::FetchingHeadHeaderStarted
320 | NodeEvent::FetchingHeadHeaderFinished { .. }
321 | NodeEvent::FetchingHeadersStarted { .. }
322 | NodeEvent::FetchingHeadersFinished { .. }
323 | NodeEvent::PrunedHeaders { .. }
324 | NodeEvent::NodeStopped => false,
325 }
326 }
327}
328
329impl fmt::Display for NodeEvent {
330 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
331 match self {
332 NodeEvent::ConnectingToBootnodes => {
333 write!(f, "Connecting to bootnodes")
334 }
335 NodeEvent::PeerConnected { id, trusted } => {
336 if *trusted {
337 write!(f, "Trusted peer connected: {id}")
338 } else {
339 write!(f, "Peer connected: {id}")
340 }
341 }
342 NodeEvent::PeerDisconnected { id, trusted } => {
343 if *trusted {
344 write!(f, "Trusted peer disconnected: {id}")
345 } else {
346 write!(f, "Peer disconnected: {id}")
347 }
348 }
349 NodeEvent::SamplingStarted {
350 height,
351 square_width,
352 shares,
353 } => {
354 write!(f, "Sampling of block {height} started. Square: {square_width}x{square_width}, Shares: {shares:?}")
355 }
356 NodeEvent::ShareSamplingResult {
357 height,
358 row,
359 column,
360 accepted,
361 ..
362 } => {
363 let acc = if *accepted { "accepted" } else { "rejected" };
364 write!(
365 f,
366 "Sampling for share [{row}, {column}] of block {height} was {acc}"
367 )
368 }
369 NodeEvent::SamplingFinished { height, took, .. } => {
370 write!(f, "Sampling of block {height} finished. Took: {took:?}")
371 }
372 NodeEvent::FatalDaserError { error } => {
373 write!(f, "Daser stopped because of a fatal error: {error}")
374 }
375 NodeEvent::AddedHeaderFromHeaderSub { height } => {
376 write!(f, "Added header {height} from header-sub")
377 }
378 NodeEvent::FetchingHeadHeaderStarted => {
379 write!(f, "Fetching header of network head block started")
380 }
381 NodeEvent::FetchingHeadHeaderFinished { height, took } => {
382 write!(f, "Fetching header of network head block finished. Height: {height}, Took: {took:?}")
383 }
384 NodeEvent::FetchingHeadersStarted {
385 from_height,
386 to_height,
387 } => {
388 if from_height == to_height {
389 write!(f, "Fetching header of block {from_height} started")
390 } else {
391 write!(
392 f,
393 "Fetching headers of blocks {from_height}-{to_height} started"
394 )
395 }
396 }
397 NodeEvent::FetchingHeadersFinished {
398 from_height,
399 to_height,
400 took,
401 } => {
402 if from_height == to_height {
403 write!(
404 f,
405 "Fetching header of block {from_height} finished. Took: {took:?}"
406 )
407 } else {
408 write!(f, "Fetching headers of blocks {from_height}-{to_height} finished. Took: {took:?}")
409 }
410 }
411 NodeEvent::FetchingHeadersFailed {
412 from_height,
413 to_height,
414 error,
415 took,
416 } => {
417 if from_height == to_height {
418 write!(
419 f,
420 "Fetching header of block {from_height} failed. Took: {took:?}, Error: {error}"
421 )
422 } else {
423 write!(f, "Fetching headers of blocks {from_height}-{to_height} failed. Took: {took:?}, Error: {error}")
424 }
425 }
426 NodeEvent::FatalSyncerError { error } => {
427 write!(f, "Syncer stopped because of a fatal error: {error}")
428 }
429 Self::PrunedHeaders { to_height } => {
430 write!(f, "Pruned headers up to and including {to_height}")
431 }
432 NodeEvent::FatalPrunerError { error } => {
433 write!(f, "Pruner stopped because of a fatal error: {error}")
434 }
435 NodeEvent::NetworkCompromised => {
436 write!(f, "The network is compromised and should not be trusted. ")?;
437 write!(f, "Node stopped synchronizing and sampling, but you can still make some queries to the network.")
438 }
439 NodeEvent::NodeStopped => {
440 write!(f, "Node stopped")
441 }
442 }
443 }
444}
445
446fn serialize_as_string<T, S>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
447where
448 T: ToString,
449 S: serde::ser::Serializer,
450{
451 value.to_string().serialize(serializer)
452}
453
454#[cfg(target_arch = "wasm32")]
455fn serialize_system_time<S>(value: &SystemTime, serializer: S) -> Result<S::Ok, S::Error>
456where
457 S: serde::ser::Serializer,
458{
459 let js_time = value
461 .duration_since(SystemTime::UNIX_EPOCH)
462 .expect("SystemTime is before 1970")
463 .as_secs_f64()
464 * 1000.0;
465 js_time.serialize(serializer)
466}