1use std::fmt;
4use std::panic::Location;
5use std::time::Duration;
6
7use libp2p::PeerId;
8use lumina_utils::time::SystemTime;
9use serde::Serialize;
10use tokio::sync::broadcast;
11
12const EVENT_CHANNEL_CAPACITY: usize = 1024;
13
14#[derive(Debug, thiserror::Error)]
16pub enum RecvError {
17 #[error("Event channel closed")]
19 Closed,
20}
21
22#[derive(Debug, thiserror::Error)]
24pub enum TryRecvError {
25 #[error("Event channel empty")]
27 Empty,
28 #[error("Event channel closed")]
30 Closed,
31}
32
33#[derive(Debug)]
35pub(crate) struct EventChannel {
36 tx: broadcast::Sender<NodeEventInfo>,
37}
38
39#[derive(Debug, Clone)]
43pub(crate) struct EventPublisher {
44 tx: broadcast::Sender<NodeEventInfo>,
45}
46
47#[derive(Debug)]
51pub struct EventSubscriber {
52 rx: broadcast::Receiver<NodeEventInfo>,
53}
54
55impl EventChannel {
56 pub(crate) fn new() -> EventChannel {
58 let (tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
59 EventChannel { tx }
60 }
61
62 pub(crate) fn publisher(&self) -> EventPublisher {
64 EventPublisher {
65 tx: self.tx.clone(),
66 }
67 }
68
69 pub(crate) fn subscribe(&self) -> EventSubscriber {
71 EventSubscriber {
72 rx: self.tx.subscribe(),
73 }
74 }
75}
76
77impl Default for EventChannel {
78 fn default() -> Self {
79 EventChannel::new()
80 }
81}
82
83impl EventPublisher {
84 pub(crate) fn send(&self, event: NodeEvent) {
85 let time = SystemTime::now();
86 let location: &'static Location<'static> = Location::caller();
87
88 let _ = self.tx.send(NodeEventInfo {
91 event,
92 time,
93 file_path: location.file(),
94 file_line: location.line(),
95 });
96 }
97}
98
99impl EventSubscriber {
100 pub async fn recv(&mut self) -> Result<NodeEventInfo, RecvError> {
108 loop {
109 match self.rx.recv().await {
110 Ok(val) => return Ok(val),
111 Err(broadcast::error::RecvError::Lagged(_)) => {
112 continue;
114 }
115 Err(broadcast::error::RecvError::Closed) => return Err(RecvError::Closed),
116 }
117 }
118 }
119
120 pub fn try_recv(&mut self) -> Result<NodeEventInfo, TryRecvError> {
126 loop {
127 match self.rx.try_recv() {
128 Ok(val) => return Ok(val),
129 Err(broadcast::error::TryRecvError::Lagged(_)) => {
130 continue;
132 }
133 Err(broadcast::error::TryRecvError::Empty) => return Err(TryRecvError::Empty),
134 Err(broadcast::error::TryRecvError::Closed) => return Err(TryRecvError::Closed),
135 }
136 }
137 }
138}
139
140#[derive(Debug, Clone, Serialize)]
142pub struct NodeEventInfo {
143 pub event: NodeEvent,
145 #[cfg_attr(
146 target_arch = "wasm32",
147 serde(serialize_with = "serialize_system_time")
148 )]
149 pub time: SystemTime,
151 pub file_path: &'static str,
153 pub file_line: u32,
155}
156
157#[derive(Debug, Clone, Serialize)]
161#[non_exhaustive]
162#[serde(tag = "type")]
163#[serde(rename_all = "snake_case")]
164pub enum NodeEvent {
165 ConnectingToBootnodes,
167
168 PeerConnected {
170 #[serde(serialize_with = "serialize_as_string")]
171 id: PeerId,
173 trusted: bool,
175 },
176
177 PeerDisconnected {
179 #[serde(serialize_with = "serialize_as_string")]
180 id: PeerId,
182 trusted: bool,
184 },
185
186 SamplingStarted {
188 height: u64,
190 square_width: u16,
192 shares: Vec<(u16, u16)>,
194 },
195
196 ShareSamplingResult {
198 height: u64,
200 square_width: u16,
202 row: u16,
204 column: u16,
206 timed_out: bool,
208 },
209
210 SamplingResult {
212 height: u64,
214 timed_out: bool,
216 took: Duration,
218 },
219
220 FatalDaserError {
222 error: String,
224 },
225
226 AddedHeaderFromHeaderSub {
228 height: u64,
230 },
231
232 FetchingHeadHeaderStarted,
234
235 FetchingHeadHeaderFinished {
237 height: u64,
239 took: Duration,
241 },
242
243 FetchingHeadersStarted {
245 from_height: u64,
247 to_height: u64,
249 },
250
251 FetchingHeadersFinished {
253 from_height: u64,
255 to_height: u64,
257 took: Duration,
259 },
260
261 FetchingHeadersFailed {
263 from_height: u64,
265 to_height: u64,
267 error: String,
269 took: Duration,
271 },
272
273 FatalSyncerError {
275 error: String,
277 },
278
279 PrunedHeaders {
281 from_height: u64,
283 to_height: u64,
285 },
286
287 FatalPrunerError {
289 error: String,
291 },
292
293 NetworkCompromised,
300
301 NodeStopped,
303}
304
305impl NodeEvent {
306 pub fn is_error(&self) -> bool {
308 match self {
309 NodeEvent::FatalDaserError { .. }
310 | NodeEvent::FatalSyncerError { .. }
311 | NodeEvent::FatalPrunerError { .. }
312 | NodeEvent::FetchingHeadersFailed { .. }
313 | NodeEvent::NetworkCompromised => true,
314 NodeEvent::ConnectingToBootnodes
315 | NodeEvent::PeerConnected { .. }
316 | NodeEvent::PeerDisconnected { .. }
317 | NodeEvent::SamplingStarted { .. }
318 | NodeEvent::ShareSamplingResult { .. }
319 | NodeEvent::SamplingResult { .. }
320 | NodeEvent::AddedHeaderFromHeaderSub { .. }
321 | NodeEvent::FetchingHeadHeaderStarted
322 | NodeEvent::FetchingHeadHeaderFinished { .. }
323 | NodeEvent::FetchingHeadersStarted { .. }
324 | NodeEvent::FetchingHeadersFinished { .. }
325 | NodeEvent::PrunedHeaders { .. }
326 | NodeEvent::NodeStopped => false,
327 }
328 }
329}
330
331impl fmt::Display for NodeEvent {
332 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
333 match self {
334 NodeEvent::ConnectingToBootnodes => {
335 write!(f, "Connecting to bootnodes")
336 }
337 NodeEvent::PeerConnected { id, trusted } => {
338 if *trusted {
339 write!(f, "Trusted peer connected: {id}")
340 } else {
341 write!(f, "Peer connected: {id}")
342 }
343 }
344 NodeEvent::PeerDisconnected { id, trusted } => {
345 if *trusted {
346 write!(f, "Trusted peer disconnected: {id}")
347 } else {
348 write!(f, "Peer disconnected: {id}")
349 }
350 }
351 NodeEvent::SamplingStarted {
352 height,
353 square_width,
354 shares,
355 } => {
356 write!(
357 f,
358 "Sampling of block {height} started. Square: {square_width}x{square_width}, Shares: {shares:?}"
359 )
360 }
361 NodeEvent::ShareSamplingResult {
362 height,
363 row,
364 column,
365 timed_out,
366 ..
367 } => {
368 let s = if *timed_out { "timed out" } else { "finished" };
369 write!(
370 f,
371 "Sampling for share [{row}, {column}] of block {height} {s}"
372 )
373 }
374 NodeEvent::SamplingResult {
375 height,
376 timed_out,
377 took,
378 } => {
379 let s = if *timed_out { "timed out" } else { "finished" };
380 write!(f, "Sampling of block {height} {s}. Took: {took:?}")
381 }
382 NodeEvent::FatalDaserError { error } => {
383 write!(f, "Daser stopped because of a fatal error: {error}")
384 }
385 NodeEvent::AddedHeaderFromHeaderSub { height } => {
386 write!(f, "Added header {height} from header-sub")
387 }
388 NodeEvent::FetchingHeadHeaderStarted => {
389 write!(f, "Fetching header of network head block started")
390 }
391 NodeEvent::FetchingHeadHeaderFinished { height, took } => {
392 write!(
393 f,
394 "Fetching header of network head block finished. Height: {height}, Took: {took:?}"
395 )
396 }
397 NodeEvent::FetchingHeadersStarted {
398 from_height,
399 to_height,
400 } => {
401 if from_height == to_height {
402 write!(f, "Fetching header of block {from_height} started")
403 } else {
404 write!(
405 f,
406 "Fetching headers of blocks {from_height}-{to_height} started"
407 )
408 }
409 }
410 NodeEvent::FetchingHeadersFinished {
411 from_height,
412 to_height,
413 took,
414 } => {
415 if from_height == to_height {
416 write!(
417 f,
418 "Fetching header of block {from_height} finished. Took: {took:?}"
419 )
420 } else {
421 write!(
422 f,
423 "Fetching headers of blocks {from_height}-{to_height} finished. Took: {took:?}"
424 )
425 }
426 }
427 NodeEvent::FetchingHeadersFailed {
428 from_height,
429 to_height,
430 error,
431 took,
432 } => {
433 if from_height == to_height {
434 write!(
435 f,
436 "Fetching header of block {from_height} failed. Took: {took:?}, Error: {error}"
437 )
438 } else {
439 write!(
440 f,
441 "Fetching headers of blocks {from_height}-{to_height} failed. Took: {took:?}, Error: {error}"
442 )
443 }
444 }
445 NodeEvent::FatalSyncerError { error } => {
446 write!(f, "Syncer stopped because of a fatal error: {error}")
447 }
448 Self::PrunedHeaders {
449 from_height,
450 to_height,
451 } => {
452 if from_height == to_height {
453 write!(f, "Header of block {from_height} was pruned")
454 } else {
455 write!(f, "Headers of blocks {from_height}-{to_height} were pruned")
456 }
457 }
458 NodeEvent::FatalPrunerError { error } => {
459 write!(f, "Pruner stopped because of a fatal error: {error}")
460 }
461 NodeEvent::NetworkCompromised => {
462 write!(f, "The network is compromised and should not be trusted. ")?;
463 write!(
464 f,
465 "Node stopped synchronizing and sampling, but you can still make some queries to the network."
466 )
467 }
468 NodeEvent::NodeStopped => {
469 write!(f, "Node stopped")
470 }
471 }
472 }
473}
474
475fn serialize_as_string<T, S>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
476where
477 T: ToString,
478 S: serde::ser::Serializer,
479{
480 value.to_string().serialize(serializer)
481}
482
483#[cfg(target_arch = "wasm32")]
484fn serialize_system_time<S>(value: &SystemTime, serializer: S) -> Result<S::Ok, S::Error>
485where
486 S: serde::ser::Serializer,
487{
488 let js_time = value
490 .duration_since(SystemTime::UNIX_EPOCH)
491 .expect("SystemTime is before 1970")
492 .as_secs_f64()
493 * 1000.0;
494 js_time.serialize(serializer)
495}