use std::{
io::{self, Write},
time::{SystemTime, UNIX_EPOCH},
};
use brk_mempool::Cycle;
use brk_types::{Addr, AddrBytes, BlockHash, NextBlockHash, RecommendedFees, Txid};
use rustc_hash::FxHashSet;
use crate::event::Event;
#[derive(Default)]
pub struct Emitter {
prev_tip_hash: Option<BlockHash>,
prev_next_block_hash: Option<NextBlockHash>,
prev_block0: Option<FxHashSet<Txid>>,
prev_fees: Option<RecommendedFees>,
}
impl Emitter {
pub fn emit<W: Write>(&mut self, out: &mut W, cycle: &Cycle) -> io::Result<()> {
let t = now_secs();
for tx in &cycle.added {
write_line(out, &Event::enter(t, tx))?;
}
for tx in &cycle.removed {
write_line(out, &Event::leave(t, tx))?;
}
for bytes in &cycle.addr_enters {
Self::emit_addr(out, t, bytes, Event::addr_enter)?;
}
for bytes in &cycle.addr_leaves {
Self::emit_addr(out, t, bytes, Event::addr_leave)?;
}
if self.prev_tip_hash != Some(cycle.tip_hash) {
self.prev_tip_hash = Some(cycle.tip_hash);
write_line(out, &Event::tip(t, cycle.tip_hash, cycle.tip_height))?;
}
let next_block_hash = cycle.snapshot.next_block_hash;
if self.prev_next_block_hash != Some(next_block_hash) {
self.prev_next_block_hash = Some(next_block_hash);
let current: FxHashSet<Txid> = cycle.snapshot.block0_txids().collect();
let (added, removed) = match &self.prev_block0 {
Some(prev) => (
current.difference(prev).copied().collect(),
prev.difference(¤t).copied().collect(),
),
None => (current.iter().copied().collect(), Vec::new()),
};
write_line(out, &Event::block(t, next_block_hash, added, removed))?;
self.prev_block0 = Some(current);
}
if self.prev_fees.as_ref() != Some(&cycle.snapshot.fees) {
self.prev_fees = Some(cycle.snapshot.fees.clone());
write_line(out, &Event::fees(t, &cycle.snapshot.fees))?;
}
write_line(out, &Event::summary(t, cycle))?;
out.flush()
}
fn emit_addr<W: Write>(
out: &mut W,
t: f64,
bytes: &AddrBytes,
make_event: fn(f64, Addr) -> Event,
) -> io::Result<()> {
match Addr::try_from(bytes) {
Ok(addr) => write_line(out, &make_event(t, addr)),
Err(e) => {
eprintln!("mmpl: skipping addr event: {e}");
Ok(())
}
}
}
}
fn write_line<W: Write>(out: &mut W, ev: &Event) -> io::Result<()> {
serde_json::to_writer(&mut *out, ev).map_err(io::Error::other)?;
out.write_all(b"\n")
}
fn now_secs() -> f64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs_f64()
}