1use std::fmt;
4use std::panic::Location;
5use std::time::Duration;
6
7use libp2p::PeerId;
8use lumina_utils::time::SystemTime;
9use serde::Serialize;
10use tokio::sync::broadcast;
11
12const EVENT_CHANNEL_CAPACITY: usize = 1024;
13
14#[derive(Debug, thiserror::Error)]
16pub enum RecvError {
17 #[error("Event channel closed")]
19 Closed,
20}
21
22#[derive(Debug, thiserror::Error)]
24pub enum TryRecvError {
25 #[error("Event channel empty")]
27 Empty,
28 #[error("Event channel closed")]
30 Closed,
31}
32
33#[derive(Debug)]
35pub(crate) struct EventChannel {
36 tx: broadcast::Sender<NodeEventInfo>,
37}
38
39#[derive(Debug, Clone)]
43pub(crate) struct EventPublisher {
44 tx: broadcast::Sender<NodeEventInfo>,
45}
46
47#[derive(Debug)]
51pub struct EventSubscriber {
52 rx: broadcast::Receiver<NodeEventInfo>,
53}
54
55impl EventChannel {
56 pub(crate) fn new() -> EventChannel {
58 let (tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
59 EventChannel { tx }
60 }
61
62 pub(crate) fn publisher(&self) -> EventPublisher {
64 EventPublisher {
65 tx: self.tx.clone(),
66 }
67 }
68
69 pub(crate) fn subscribe(&self) -> EventSubscriber {
71 EventSubscriber {
72 rx: self.tx.subscribe(),
73 }
74 }
75}
76
77impl Default for EventChannel {
78 fn default() -> Self {
79 EventChannel::new()
80 }
81}
82
83impl EventPublisher {
84 pub(crate) fn send(&self, event: NodeEvent) {
85 let time = SystemTime::now();
86 let location: &'static Location<'static> = Location::caller();
87
88 let _ = self.tx.send(NodeEventInfo {
91 event,
92 time,
93 file_path: location.file(),
94 file_line: location.line(),
95 });
96 }
97}
98
99impl EventSubscriber {
100 pub async fn recv(&mut self) -> Result<NodeEventInfo, RecvError> {
108 loop {
109 match self.rx.recv().await {
110 Ok(val) => return Ok(val),
111 Err(broadcast::error::RecvError::Lagged(_)) => {
112 continue;
114 }
115 Err(broadcast::error::RecvError::Closed) => return Err(RecvError::Closed),
116 }
117 }
118 }
119
120 pub fn try_recv(&mut self) -> Result<NodeEventInfo, TryRecvError> {
126 loop {
127 match self.rx.try_recv() {
128 Ok(val) => return Ok(val),
129 Err(broadcast::error::TryRecvError::Lagged(_)) => {
130 continue;
132 }
133 Err(broadcast::error::TryRecvError::Empty) => return Err(TryRecvError::Empty),
134 Err(broadcast::error::TryRecvError::Closed) => return Err(TryRecvError::Closed),
135 }
136 }
137 }
138}
139
140#[derive(Debug, Clone, Serialize)]
142pub struct NodeEventInfo {
143 pub event: NodeEvent,
145 #[cfg_attr(
146 target_arch = "wasm32",
147 serde(serialize_with = "serialize_system_time")
148 )]
149 pub time: SystemTime,
151 pub file_path: &'static str,
153 pub file_line: u32,
155}
156
157#[derive(Debug, Clone, Serialize)]
161#[non_exhaustive]
162#[serde(tag = "type")]
163#[serde(rename_all = "snake_case")]
164pub enum NodeEvent {
165 ConnectingToBootnodes,
167
168 PeerConnected {
170 #[serde(serialize_with = "serialize_as_string")]
171 id: PeerId,
173 trusted: bool,
175 },
176
177 PeerDisconnected {
179 #[serde(serialize_with = "serialize_as_string")]
180 id: PeerId,
182 trusted: bool,
184 },
185
186 SamplingStarted {
188 height: u64,
190 square_width: u16,
192 shares: Vec<(u16, u16)>,
194 },
195
196 ShareSamplingResult {
198 height: u64,
200 square_width: u16,
202 row: u16,
204 column: u16,
206 timed_out: bool,
208 },
209
210 SamplingResult {
212 height: u64,
214 timed_out: bool,
216 took: Duration,
218 },
219
220 FatalDaserError {
222 error: String,
224 },
225
226 AddedHeaderFromHeaderSub {
228 height: u64,
230 },
231
232 FetchingHeadHeaderStarted,
234
235 FetchingHeadHeaderFinished {
237 height: u64,
239 took: Duration,
241 },
242
243 FetchingHeadersStarted {
245 from_height: u64,
247 to_height: u64,
249 },
250
251 FetchingHeadersFinished {
253 from_height: u64,
255 to_height: u64,
257 took: Duration,
259 },
260
261 FetchingHeadersFailed {
263 from_height: u64,
265 to_height: u64,
267 error: String,
269 took: Duration,
271 },
272
273 FatalSyncerError {
275 error: String,
277 },
278
279 PrunedHeaders {
281 from_height: u64,
283 to_height: u64,
285 },
286
287 FatalPrunerError {
289 error: String,
291 },
292
293 NetworkCompromised,
300
301 NodeStopped,
303}
304
305impl NodeEvent {
306 pub fn is_error(&self) -> bool {
308 match self {
309 NodeEvent::FatalDaserError { .. }
310 | NodeEvent::FatalSyncerError { .. }
311 | NodeEvent::FatalPrunerError { .. }
312 | NodeEvent::FetchingHeadersFailed { .. }
313 | NodeEvent::NetworkCompromised => true,
314 NodeEvent::ConnectingToBootnodes
315 | NodeEvent::PeerConnected { .. }
316 | NodeEvent::PeerDisconnected { .. }
317 | NodeEvent::SamplingStarted { .. }
318 | NodeEvent::ShareSamplingResult { .. }
319 | NodeEvent::SamplingResult { .. }
320 | NodeEvent::AddedHeaderFromHeaderSub { .. }
321 | NodeEvent::FetchingHeadHeaderStarted
322 | NodeEvent::FetchingHeadHeaderFinished { .. }
323 | NodeEvent::FetchingHeadersStarted { .. }
324 | NodeEvent::FetchingHeadersFinished { .. }
325 | NodeEvent::PrunedHeaders { .. }
326 | NodeEvent::NodeStopped => false,
327 }
328 }
329}
330
331impl fmt::Display for NodeEvent {
332 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
333 match self {
334 NodeEvent::ConnectingToBootnodes => {
335 write!(f, "Connecting to bootnodes")
336 }
337 NodeEvent::PeerConnected { id, trusted } => {
338 if *trusted {
339 write!(f, "Trusted peer connected: {id}")
340 } else {
341 write!(f, "Peer connected: {id}")
342 }
343 }
344 NodeEvent::PeerDisconnected { id, trusted } => {
345 if *trusted {
346 write!(f, "Trusted peer disconnected: {id}")
347 } else {
348 write!(f, "Peer disconnected: {id}")
349 }
350 }
351 NodeEvent::SamplingStarted {
352 height,
353 square_width,
354 shares,
355 } => {
356 write!(f, "Sampling of block {height} started. Square: {square_width}x{square_width}, Shares: {shares:?}")
357 }
358 NodeEvent::ShareSamplingResult {
359 height,
360 row,
361 column,
362 timed_out,
363 ..
364 } => {
365 let s = if *timed_out { "timed out" } else { "finished" };
366 write!(
367 f,
368 "Sampling for share [{row}, {column}] of block {height} {s}"
369 )
370 }
371 NodeEvent::SamplingResult {
372 height,
373 timed_out,
374 took,
375 } => {
376 let s = if *timed_out { "timed out" } else { "finished" };
377 write!(f, "Sampling of block {height} {s}. Took: {took:?}")
378 }
379 NodeEvent::FatalDaserError { error } => {
380 write!(f, "Daser stopped because of a fatal error: {error}")
381 }
382 NodeEvent::AddedHeaderFromHeaderSub { height } => {
383 write!(f, "Added header {height} from header-sub")
384 }
385 NodeEvent::FetchingHeadHeaderStarted => {
386 write!(f, "Fetching header of network head block started")
387 }
388 NodeEvent::FetchingHeadHeaderFinished { height, took } => {
389 write!(f, "Fetching header of network head block finished. Height: {height}, Took: {took:?}")
390 }
391 NodeEvent::FetchingHeadersStarted {
392 from_height,
393 to_height,
394 } => {
395 if from_height == to_height {
396 write!(f, "Fetching header of block {from_height} started")
397 } else {
398 write!(
399 f,
400 "Fetching headers of blocks {from_height}-{to_height} started"
401 )
402 }
403 }
404 NodeEvent::FetchingHeadersFinished {
405 from_height,
406 to_height,
407 took,
408 } => {
409 if from_height == to_height {
410 write!(
411 f,
412 "Fetching header of block {from_height} finished. Took: {took:?}"
413 )
414 } else {
415 write!(f, "Fetching headers of blocks {from_height}-{to_height} finished. Took: {took:?}")
416 }
417 }
418 NodeEvent::FetchingHeadersFailed {
419 from_height,
420 to_height,
421 error,
422 took,
423 } => {
424 if from_height == to_height {
425 write!(
426 f,
427 "Fetching header of block {from_height} failed. Took: {took:?}, Error: {error}"
428 )
429 } else {
430 write!(f, "Fetching headers of blocks {from_height}-{to_height} failed. Took: {took:?}, Error: {error}")
431 }
432 }
433 NodeEvent::FatalSyncerError { error } => {
434 write!(f, "Syncer stopped because of a fatal error: {error}")
435 }
436 Self::PrunedHeaders {
437 from_height,
438 to_height,
439 } => {
440 if from_height == to_height {
441 write!(f, "Header of block {from_height} was pruned")
442 } else {
443 write!(f, "Headers of blocks {from_height}-{to_height} were pruned")
444 }
445 }
446 NodeEvent::FatalPrunerError { error } => {
447 write!(f, "Pruner stopped because of a fatal error: {error}")
448 }
449 NodeEvent::NetworkCompromised => {
450 write!(f, "The network is compromised and should not be trusted. ")?;
451 write!(f, "Node stopped synchronizing and sampling, but you can still make some queries to the network.")
452 }
453 NodeEvent::NodeStopped => {
454 write!(f, "Node stopped")
455 }
456 }
457 }
458}
459
460fn serialize_as_string<T, S>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
461where
462 T: ToString,
463 S: serde::ser::Serializer,
464{
465 value.to_string().serialize(serializer)
466}
467
468#[cfg(target_arch = "wasm32")]
469fn serialize_system_time<S>(value: &SystemTime, serializer: S) -> Result<S::Ok, S::Error>
470where
471 S: serde::ser::Serializer,
472{
473 let js_time = value
475 .duration_since(SystemTime::UNIX_EPOCH)
476 .expect("SystemTime is before 1970")
477 .as_secs_f64()
478 * 1000.0;
479 js_time.serialize(serializer)
480}