use super::{EventKind, NetLogMessage};
use crate::{config::GlobalExecutor, message::Transaction, node::PeerId};
use anyhow::Result;
use chrono::{DateTime, Utc};
use dashmap::DashMap;
use std::{path::PathBuf, sync::Arc};
use tokio::sync::RwLock;
#[allow(async_fn_in_trait)]
pub trait EventSource: Send + Sync {
async fn get_events(&self) -> Result<Vec<NetLogMessage>>;
async fn get_events_for_transaction(&self, tx: &Transaction) -> Result<Vec<NetLogMessage>> {
let all_events = self.get_events().await?;
Ok(all_events
.into_iter()
.filter(|event| &event.tx == tx)
.collect())
}
fn get_label(&self) -> Option<String> {
None
}
}
#[derive(Clone)]
pub struct AOFEventSource {
path: PathBuf,
label: Option<String>,
cached_events: Arc<RwLock<Option<Vec<NetLogMessage>>>>,
}
impl AOFEventSource {
pub fn new(path: PathBuf, label: Option<String>) -> Self {
Self {
path,
label,
cached_events: Arc::new(RwLock::new(None)),
}
}
}
impl EventSource for AOFEventSource {
async fn get_events(&self) -> Result<Vec<NetLogMessage>> {
{
let cached = self.cached_events.read().await;
if let Some(events) = cached.as_ref() {
return Ok(events.clone());
}
}
let events = super::aof::LogFile::read_all_events(&self.path).await?;
{
let mut cached = self.cached_events.write().await;
*cached = Some(events.clone());
}
Ok(events)
}
fn get_label(&self) -> Option<String> {
self.label.clone()
}
}
pub struct WebSocketEventCollector {
events: Arc<RwLock<Vec<NetLogMessage>>>,
peer_labels: Arc<DashMap<PeerId, String>>,
port: u16,
}
impl WebSocketEventCollector {
pub async fn new(port: u16) -> Result<Self> {
let events = Arc::new(RwLock::new(Vec::new()));
let peer_labels = Arc::new(DashMap::new());
let collector = Self {
events: events.clone(),
peer_labels: peer_labels.clone(),
port,
};
GlobalExecutor::spawn(Self::run_server(port, events, peer_labels));
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
Ok(collector)
}
async fn run_server(
port: u16,
events: Arc<RwLock<Vec<NetLogMessage>>>,
_peer_labels: Arc<DashMap<PeerId, String>>,
) -> Result<()> {
use futures::StreamExt;
use tokio::net::TcpListener;
use tokio_tungstenite::accept_async;
let addr = format!("127.0.0.1:{}", port);
let listener = TcpListener::bind(&addr).await?;
tracing::info!("WebSocket event collector listening on {}", addr);
while let Ok((stream, _)) = listener.accept().await {
let events = events.clone();
GlobalExecutor::spawn(async move {
if let Ok(ws_stream) = accept_async(stream).await {
let (_write, mut read) = ws_stream.split();
while let Some(Ok(msg)) = read.next().await {
if let Ok(event) = Self::parse_message(msg) {
events.write().await.push(event);
}
}
}
});
}
Ok(())
}
fn parse_message(msg: tokio_tungstenite::tungstenite::Message) -> Result<NetLogMessage> {
use tokio_tungstenite::tungstenite::Message;
match msg {
Message::Binary(data) => {
Self::parse_flatbuffer(&data)
}
Message::Text(_)
| Message::Ping(_)
| Message::Pong(_)
| Message::Close(_)
| Message::Frame(_) => anyhow::bail!("Unexpected message type"),
}
}
fn parse_flatbuffer(_data: &[u8]) -> Result<NetLogMessage> {
anyhow::bail!("FlatBuffer parsing not yet implemented - use AOF files for now")
}
pub fn register_peer_label(&self, peer_id: PeerId, label: String) {
self.peer_labels.insert(peer_id, label);
}
pub fn port(&self) -> u16 {
self.port
}
}
impl EventSource for WebSocketEventCollector {
async fn get_events(&self) -> Result<Vec<NetLogMessage>> {
Ok(self.events.read().await.clone())
}
}
#[derive(Clone)]
pub struct TestEventListenerSource {
listener: Arc<super::TestEventListener>,
}
impl TestEventListenerSource {
#[allow(private_interfaces)]
pub fn new(listener: Arc<super::TestEventListener>) -> Self {
Self { listener }
}
}
impl EventSource for TestEventListenerSource {
async fn get_events(&self) -> Result<Vec<NetLogMessage>> {
let logs = self.listener.logs.lock().await;
Ok(logs.clone())
}
}
#[derive(Debug, Clone)]
pub struct TransactionFlowEvent {
pub peer_id: PeerId,
pub peer_label: Option<String>,
pub event_kind: EventKind,
pub timestamp: DateTime<Utc>,
}
#[derive(Debug, Clone)]
pub struct RoutingPath {
pub transaction: Transaction,
pub path: Vec<(PeerId, Option<String>)>,
pub duration: Option<chrono::Duration>,
}
pub struct EventLogAggregator<S: EventSource> {
sources: Vec<S>,
cached_events: Arc<RwLock<Option<Vec<NetLogMessage>>>>,
}
impl<S: EventSource> EventLogAggregator<S> {
pub fn new(sources: Vec<S>) -> Self {
Self {
sources,
cached_events: Arc::new(RwLock::new(None)),
}
}
pub async fn get_all_events(&self) -> Result<Vec<NetLogMessage>> {
{
let cached = self.cached_events.read().await;
if let Some(events) = cached.as_ref() {
return Ok(events.clone());
}
}
let mut all_events = Vec::new();
for source in &self.sources {
let events = source.get_events().await?;
all_events.extend(events);
}
all_events.sort_by_key(|e| e.datetime);
{
let mut cached = self.cached_events.write().await;
*cached = Some(all_events.clone());
}
Ok(all_events)
}
pub async fn get_transaction_flow(
&self,
tx: &Transaction,
) -> Result<Vec<TransactionFlowEvent>> {
let all_events = self.get_all_events().await?;
let flow: Vec<TransactionFlowEvent> = all_events
.into_iter()
.filter(|event| &event.tx == tx)
.map(|event| TransactionFlowEvent {
peer_id: event.peer_id.clone(),
peer_label: None, event_kind: event.kind.clone(),
timestamp: event.datetime,
})
.collect();
Ok(flow)
}
pub async fn get_routing_path(&self, tx: &Transaction) -> Result<RoutingPath> {
let flow = self.get_transaction_flow(tx).await?;
let mut path = Vec::new();
let mut seen_peers = std::collections::HashSet::new();
for event in &flow {
if seen_peers.insert(event.peer_id.clone()) {
path.push((event.peer_id.clone(), event.peer_label.clone()));
}
}
let duration = if flow.len() >= 2 {
Some(flow.last().unwrap().timestamp - flow.first().unwrap().timestamp)
} else {
None
};
Ok(RoutingPath {
transaction: *tx,
path,
duration,
})
}
pub async fn export_mermaid_graph(&self, tx: &Transaction) -> Result<String> {
let flow = self.get_transaction_flow(tx).await?;
let mut mermaid = String::from("```mermaid\ngraph TD\n");
let mut prev_id: Option<String> = None;
for (idx, event) in flow.iter().enumerate() {
let node_id = format!("N{}", idx);
let peer_label = event
.peer_label
.clone()
.unwrap_or_else(|| format!("{:.8}", event.peer_id.to_string()));
let event_desc = format!("{:?}", event.event_kind);
mermaid.push_str(&format!(
" {}[\"{}\\n{}\"]\n",
node_id, peer_label, event_desc
));
if let Some(prev) = prev_id {
mermaid.push_str(&format!(" {} --> {}\n", prev, node_id));
}
prev_id = Some(node_id);
}
mermaid.push_str("```\n");
Ok(mermaid)
}
pub async fn clear_cache(&self) {
*self.cached_events.write().await = None;
}
}
impl EventLogAggregator<AOFEventSource> {
pub async fn from_aof_files(paths: Vec<(PathBuf, Option<String>)>) -> Result<Self> {
let sources: Vec<AOFEventSource> = paths
.into_iter()
.map(|(path, label)| AOFEventSource::new(path, label))
.collect();
Ok(Self::new(sources))
}
}
impl EventLogAggregator<WebSocketEventCollector> {
pub async fn with_websocket_collector(port: u16) -> Result<Self> {
let collector = WebSocketEventCollector::new(port).await?;
Ok(Self::new(vec![collector]))
}
}
impl EventLogAggregator<TestEventListenerSource> {
#[allow(private_interfaces)]
pub fn from_test_listener(listener: Arc<super::TestEventListener>) -> Self {
Self::new(vec![TestEventListenerSource::new(listener)])
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_aggregator_creation() {
let aggregator: EventLogAggregator<AOFEventSource> = EventLogAggregator::new(vec![]);
let events = aggregator.get_all_events().await.unwrap();
assert_eq!(events.len(), 0);
}
#[tokio::test]
async fn test_aggregator_with_aof_sources() {
let temp_dir = tempfile::tempdir().unwrap();
let log1 = temp_dir.path().join("node1_log");
let log2 = temp_dir.path().join("node2_log");
std::fs::write(&log1, []).unwrap();
std::fs::write(&log2, []).unwrap();
let aggregator = EventLogAggregator::<AOFEventSource>::from_aof_files(vec![
(log1, Some("node-1".into())),
(log2, Some("node-2".into())),
])
.await
.unwrap();
let events = aggregator.get_all_events().await.unwrap();
assert_eq!(events.len(), 0, "No events expected from empty logs");
let events2 = aggregator.get_all_events().await.unwrap();
assert_eq!(events2.len(), 0);
}
#[tokio::test]
async fn test_aggregator_cache_clearing() {
let temp_dir = tempfile::tempdir().unwrap();
let log = temp_dir.path().join("node_log");
std::fs::write(&log, []).unwrap();
let aggregator =
EventLogAggregator::<AOFEventSource>::from_aof_files(vec![(log, Some("node".into()))])
.await
.unwrap();
let _events = aggregator.get_all_events().await.unwrap();
aggregator.clear_cache().await;
let events = aggregator.get_all_events().await.unwrap();
assert_eq!(events.len(), 0);
}
}