use std::sync::Arc;
use chrono::Utc;
use either::Either;
use freenet_stdlib::prelude::*;
use futures::{FutureExt, future::BoxFuture};
use tokio::net::TcpStream;
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use crate::{
message::{NetMessage, NetMessageV1, Transaction},
node::PeerId,
operations::{
connect,
get::{GetMsg, GetMsgResult},
put::PutMsg,
subscribe::{SubscribeMsg, SubscribeMsgResult},
update::UpdateMsg,
},
ring::Location,
router::RouteEvent,
};
#[cfg(feature = "trace-ot")]
pub(crate) use metrics_client::OTEventRegister;
pub(crate) use test::TestEventListener;
pub use event_aggregator::{
AOFEventSource, EventLogAggregator, EventSource, RoutingPath, TransactionFlowEvent,
WebSocketEventCollector,
};
pub use state_verifier::{StateVerifier, VerificationReport};
use crate::node::OpManager;
mod aof;
pub mod event_aggregator;
pub mod telemetry;
pub use telemetry::TelemetryReporter;
pub mod state_verifier;
pub fn state_hash_full(state: &WrappedState) -> String {
let hash = blake3::hash(state.as_ref());
hash.to_hex().to_string()
}
pub fn state_hash_short(state: &WrappedState) -> String {
let hash = blake3::hash(state.as_ref());
let bytes = hash.as_bytes();
format!(
"{:02x}{:02x}{:02x}{:02x}",
bytes[0], bytes[1], bytes[2], bytes[3]
)
}
pub(crate) mod register;
pub(crate) mod metrics_client;
pub mod event_kind;
#[cfg(feature = "trace")]
pub mod tracer;
#[cfg(feature = "trace-ot")]
pub(crate) use register::CombinedRegister;
pub(crate) use register::{
DynamicRegister, EventRegister, ListenerLogId, NetEventLog, NetEventRegister,
};
pub use register::{EventFlushHandle, NetLogMessage};
pub(crate) use event_kind::{ConnectEvent, GetEvent, PutEvent, SubscribeEvent, UpdateEvent};
pub use event_kind::{
ConnectionType, DisconnectReason, EventKind, HostingStoppedReason, InterestSyncEvent,
OperationFailure, PeerLifecycleEvent, TransferDirection, TransferEvent,
};
pub(crate) use metrics_client::{
connect_to_metrics_server, received_from_metrics_server, send_to_metrics_server,
};
pub(crate) use register::NEW_RECORDS_TS;
pub(super) mod test {
use dashmap::DashMap;
use std::{
collections::HashMap,
sync::atomic::{AtomicUsize, Ordering::SeqCst},
};
use super::*;
use crate::{node::testing_impl::NodeLabel, ring::Distance, transport::TransportPublicKey};
static LOG_ID: AtomicUsize = AtomicUsize::new(0);
#[derive(Clone)]
pub(crate) struct TestEventListener {
node_labels: Arc<DashMap<NodeLabel, TransportPublicKey>>,
tx_log: Arc<DashMap<Transaction, Vec<ListenerLogId>>>,
pub(crate) logs: Arc<tokio::sync::Mutex<Vec<NetLogMessage>>>,
network_metrics_server:
Arc<tokio::sync::Mutex<Option<WebSocketStream<MaybeTlsStream<TcpStream>>>>>,
}
impl TestEventListener {
pub async fn new() -> Self {
TestEventListener {
node_labels: Arc::new(DashMap::new()),
tx_log: Arc::new(DashMap::new()),
logs: Arc::new(tokio::sync::Mutex::new(Vec::new())),
network_metrics_server: Arc::new(tokio::sync::Mutex::new(
connect_to_metrics_server().await,
)),
}
}
pub fn add_node(&mut self, label: NodeLabel, peer: TransportPublicKey) {
self.node_labels.insert(label, peer);
}
pub fn is_connected(&self, peer: &TransportPublicKey) -> bool {
let Ok(logs) = self.logs.try_lock() else {
return false;
};
logs.iter().any(|log| {
log.peer_id.pub_key() == peer
&& matches!(log.kind, EventKind::Connect(ConnectEvent::Connected { .. }))
})
}
pub fn connections(
&self,
key: &TransportPublicKey,
) -> Box<dyn Iterator<Item = (PeerId, Distance)>> {
let Ok(logs) = self.logs.try_lock() else {
return Box::new([].into_iter());
};
let disconnects = logs
.iter()
.filter(|l| matches!(l.kind, EventKind::Disconnected { .. }))
.fold(HashMap::<_, Vec<_>>::new(), |mut map, log| {
map.entry(log.peer_id.clone())
.or_default()
.push(log.datetime);
map
});
let iter = logs
.iter()
.filter_map(|l| {
if let EventKind::Connect(ConnectEvent::Connected {
this, connected, ..
}) = &l.kind
{
let connected_id =
PeerId::new(connected.pub_key().clone(), connected.socket_addr()?);
let disconnected = disconnects
.get(&connected_id)
.iter()
.flat_map(|dcs| dcs.iter())
.any(|dc| dc > &l.datetime);
if let Some((this_loc, conn_loc)) =
this.location().zip(connected.location())
{
if this.pub_key() == key && !disconnected {
return Some((connected_id, conn_loc.distance(this_loc)));
}
}
}
None
})
.collect::<HashMap<_, _>>()
.into_iter();
Box::new(iter)
}
fn create_log(log: NetEventLog) -> (NetLogMessage, ListenerLogId) {
let log_id = ListenerLogId(LOG_ID.fetch_add(1, SeqCst));
let NetEventLog { peer_id, kind, .. } = log;
let msg_log = NetLogMessage {
datetime: Utc::now(),
tx: *log.tx,
peer_id,
kind,
};
(msg_log, log_id)
}
}
impl super::NetEventRegister for TestEventListener {
fn register_events<'a>(
&'a self,
logs: Either<NetEventLog<'a>, Vec<NetEventLog<'a>>>,
) -> BoxFuture<'a, ()> {
async {
match logs {
Either::Left(log) => {
let tx = log.tx;
let (msg_log, log_id) = Self::create_log(log);
if let Some(conn) = &mut *self.network_metrics_server.lock().await {
send_to_metrics_server(conn, &msg_log).await;
}
self.logs.lock().await.push(msg_log);
self.tx_log.entry(*tx).or_default().push(log_id);
}
Either::Right(logs) => {
let logs_list = &mut *self.logs.lock().await;
let mut lock = self.network_metrics_server.lock().await;
for log in logs {
let tx = log.tx;
let (msg_log, log_id) = Self::create_log(log);
if let Some(conn) = &mut *lock {
send_to_metrics_server(conn, &msg_log).await;
}
logs_list.push(msg_log);
self.tx_log.entry(*tx).or_default().push(log_id);
}
}
}
}
.boxed()
}
fn trait_clone(&self) -> Box<dyn NetEventRegister> {
Box::new(self.clone())
}
fn notify_of_time_out(
&mut self,
_: Transaction,
_op_type: &str,
_target_peer: Option<String>,
) -> BoxFuture<'_, ()> {
async {}.boxed()
}
fn get_router_events(
&self,
_number: usize,
) -> BoxFuture<'_, anyhow::Result<Vec<RouteEvent>>> {
async { Ok(vec![]) }.boxed()
}
}
#[tokio::test]
async fn test_get_connections() -> anyhow::Result<()> {
let peer_id = PeerId::random();
let tx = Transaction::new::<connect::ConnectMsg>();
let other_peers = [PeerId::random(), PeerId::random(), PeerId::random()];
let listener = TestEventListener::new().await;
let futs = futures::stream::FuturesUnordered::from_iter(other_peers.iter().map(|other| {
listener.register_events(Either::Left(NetEventLog {
tx: &tx,
peer_id: peer_id.clone(),
kind: EventKind::Connect(ConnectEvent::Connected {
this: peer_id.as_peer_key_location(),
connected: other.as_peer_key_location(),
elapsed_ms: None,
connection_type: ConnectionType::Direct,
latency_ms: None,
this_peer_connection_count: 0,
initiated_by: None,
}),
}))
}));
futures::future::join_all(futs).await;
let distances: Vec<_> = listener.connections(peer_id.pub_key()).collect();
assert_eq!(distances.len(), 3, "Should have 3 connections");
for (_, dist) in &distances {
assert!(
dist.as_f64() >= 0.0 && dist.as_f64() <= 0.5,
"Distance should be valid ring distance"
);
}
Ok(())
}
#[test]
fn test_state_hash_short() {
use freenet_stdlib::prelude::WrappedState;
let state = WrappedState::new(vec![1, 2, 3, 4, 5]);
let hash = super::state_hash_short(&state);
assert_eq!(hash.len(), 8, "Hash should be 8 hex characters");
assert!(
hash.chars().all(|c| c.is_ascii_hexdigit()),
"Hash should only contain hex digits"
);
assert_eq!(
hash,
super::state_hash_short(&state),
"Hash should be deterministic"
);
let state2 = WrappedState::new(vec![5, 4, 3, 2, 1]);
assert_ne!(
hash,
super::state_hash_short(&state2),
"Different states should produce different hashes"
);
let empty = WrappedState::new(vec![]);
let empty_hash = super::state_hash_short(&empty);
assert_eq!(
empty_hash.len(),
8,
"Empty state should still produce 8-char hash"
);
}
}
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct GetOutcomeSummary {
pub successes: u64,
pub not_found: u64,
pub failures: u64,
pub timeouts: u64,
pub network_successes: u64,
pub success_elapsed_ms: Vec<u64>,
}
impl GetOutcomeSummary {
pub fn total(&self) -> u64 {
self.successes + self.not_found + self.failures + self.timeouts
}
}
pub const GET_TIMEOUT_CLASSIFICATION_MS: u64 = 55_000;
#[allow(clippy::wildcard_enum_match_arm)]
pub fn summarize_get_outcomes_per_tx(logs: &[NetLogMessage]) -> GetOutcomeSummary {
use std::collections::HashMap;
#[derive(Default)]
struct TxAgg {
success: bool,
success_elapsed: Option<u64>,
max_hop: Option<usize>,
saw_failure: bool,
max_failure_elapsed: Option<u64>,
}
let mut per_tx: HashMap<Transaction, TxAgg> = HashMap::new();
for log in logs {
let agg = match &log.kind {
EventKind::Get(GetEvent::GetSuccess { .. }) => {
let agg = per_tx.entry(log.tx).or_default();
agg.success = true;
if let Some(ms) = log.kind.get_elapsed_ms() {
agg.success_elapsed = Some(agg.success_elapsed.map_or(ms, |cur| cur.max(ms)));
}
if let Some(hops) = log.kind.hop_count() {
agg.max_hop = Some(agg.max_hop.map_or(hops, |cur| cur.max(hops)));
}
continue;
}
EventKind::Get(GetEvent::GetNotFound { .. }) => per_tx.entry(log.tx).or_default(),
EventKind::Get(GetEvent::GetFailure { .. }) => {
let agg = per_tx.entry(log.tx).or_default();
agg.saw_failure = true;
agg
}
_ => continue,
};
if let Some(ms) = log.kind.get_elapsed_ms() {
agg.max_failure_elapsed = Some(agg.max_failure_elapsed.map_or(ms, |cur| cur.max(ms)));
}
}
let mut summary = GetOutcomeSummary::default();
for agg in per_tx.values() {
if agg.success {
summary.successes += 1;
if agg.max_hop.unwrap_or(0) >= 1 {
summary.network_successes += 1;
}
if let Some(ms) = agg.success_elapsed {
summary.success_elapsed_ms.push(ms);
}
} else if agg.saw_failure {
summary.failures += 1;
} else if let Some(ms) = agg.max_failure_elapsed {
if ms >= GET_TIMEOUT_CLASSIFICATION_MS {
summary.timeouts += 1;
} else {
summary.not_found += 1;
}
} else {
summary.failures += 1;
}
}
summary.success_elapsed_ms.sort_unstable();
summary
}
#[cfg(test)]
mod get_outcome_summary_tests {
use super::*;
use crate::operations::get::GetMsg;
use crate::ring::PeerKeyLocation;
use crate::transport::TransportPublicKey;
use freenet_stdlib::prelude::{CodeHash, ContractInstanceId, ContractKey};
use std::net::SocketAddr;
fn make_peer_id(port: u16) -> PeerId {
let addr = SocketAddr::from(([127, 0, 0, 1], port));
let key = TransportPublicKey::from_bytes([port as u8; 32]);
PeerId::new(key, addr)
}
fn make_pkl(port: u16) -> PeerKeyLocation {
let key = TransportPublicKey::from_bytes([port as u8; 32]);
let addr = SocketAddr::from(([127, 0, 0, 1], port));
PeerKeyLocation::new(key, addr)
}
fn make_key() -> ContractKey {
ContractKey::from_id_and_code(ContractInstanceId::new([1u8; 32]), CodeHash::new([2u8; 32]))
}
fn base_time() -> chrono::DateTime<chrono::Utc> {
chrono::DateTime::parse_from_rfc3339("2025-01-01T00:00:00Z")
.unwrap()
.with_timezone(&chrono::Utc)
}
fn not_found_event(tx: Transaction, port: u16, elapsed_ms: u64) -> NetLogMessage {
NetLogMessage {
tx,
datetime: base_time(),
peer_id: make_peer_id(port),
kind: EventKind::Get(GetEvent::GetNotFound {
id: tx,
requester: make_pkl(port),
instance_id: *make_key().id(),
target: make_pkl(port),
hop_count: Some(0),
elapsed_ms,
timestamp: 100,
}),
}
}
fn success_event(
tx: Transaction,
port: u16,
hop_count: Option<usize>,
elapsed_ms: u64,
) -> NetLogMessage {
NetLogMessage {
tx,
datetime: base_time(),
peer_id: make_peer_id(port),
kind: EventKind::Get(GetEvent::GetSuccess {
id: tx,
requester: make_pkl(port),
target: make_pkl(port),
key: make_key(),
hop_count,
elapsed_ms,
timestamp: 100,
state_hash: None,
}),
}
}
#[test]
fn failed_attempt_double_registration_counts_once() {
let tx = Transaction::new::<GetMsg>();
let logs = vec![not_found_event(tx, 3001, 5), not_found_event(tx, 3001, 6)];
let summary = summarize_get_outcomes_per_tx(&logs);
assert_eq!(
summary.not_found, 1,
"double-registered NotFound must dedup"
);
assert_eq!(summary.total(), 1);
}
#[test]
fn multi_hop_success_counts_once_and_dominates() {
let tx = Transaction::new::<GetMsg>();
let logs = vec![
not_found_event(tx, 3003, 4),
success_event(tx, 3002, Some(2), 10),
success_event(tx, 3001, Some(2), 15),
];
let summary = summarize_get_outcomes_per_tx(&logs);
assert_eq!(summary.successes, 1);
assert_eq!(summary.not_found, 0, "success must dominate per tx");
assert_eq!(
summary.network_successes, 1,
"hop_count >= 1 is a network success"
);
assert_eq!(
summary.success_elapsed_ms,
vec![15],
"originator-side (max) elapsed wins"
);
}
#[test]
fn local_hit_is_not_a_network_success() {
let tx = Transaction::new::<GetMsg>();
let logs = vec![success_event(tx, 3001, Some(0), 1)];
let summary = summarize_get_outcomes_per_tx(&logs);
assert_eq!(summary.successes, 1);
assert_eq!(summary.network_successes, 0);
}
fn failure_event(tx: Transaction, port: u16, elapsed_ms: u64) -> NetLogMessage {
NetLogMessage {
tx,
datetime: base_time(),
peer_id: make_peer_id(port),
kind: EventKind::Get(GetEvent::GetFailure {
id: tx,
requester: make_pkl(port),
instance_id: *make_key().id(),
target: make_pkl(port),
hop_count: Some(0),
reason: OperationFailure::ConnectionDropped,
elapsed_ms,
timestamp: 100,
}),
}
}
#[test]
fn get_failure_classifies_as_failure_not_not_found() {
let tx = Transaction::new::<GetMsg>();
let logs = vec![failure_event(tx, 3001, 10)];
let summary = summarize_get_outcomes_per_tx(&logs);
assert_eq!(summary.failures, 1, "GetFailure must land in failures");
assert_eq!(summary.not_found, 0);
assert_eq!(summary.total(), 1);
}
#[test]
fn get_failure_above_timeout_threshold_stays_failure() {
let tx = Transaction::new::<GetMsg>();
let logs = vec![failure_event(
tx,
3001,
GET_TIMEOUT_CLASSIFICATION_MS + 1_000,
)];
let summary = summarize_get_outcomes_per_tx(&logs);
assert_eq!(
summary.failures, 1,
"failure must outrank the timeout heuristic"
);
assert_eq!(summary.timeouts, 0);
assert_eq!(summary.total(), 1);
}
#[test]
fn success_dominates_failure_and_failure_dominates_not_found() {
let tx1 = Transaction::new::<GetMsg>();
let tx2 = Transaction::new::<GetMsg>();
let logs = vec![
failure_event(tx1, 3002, 5),
success_event(tx1, 3001, Some(2), 20),
not_found_event(tx2, 3003, 5),
failure_event(tx2, 3003, 6),
];
let summary = summarize_get_outcomes_per_tx(&logs);
assert_eq!(summary.successes, 1, "success must dominate failure per tx");
assert_eq!(
summary.failures, 1,
"failure must dominate not_found per tx"
);
assert_eq!(summary.not_found, 0);
assert_eq!(summary.total(), 2);
}
#[test]
fn timeout_classification_and_distinct_txs() {
let tx1 = Transaction::new::<GetMsg>();
let tx2 = Transaction::new::<GetMsg>();
let logs = vec![
not_found_event(tx1, 3001, GET_TIMEOUT_CLASSIFICATION_MS),
not_found_event(tx2, 3002, 10),
];
let summary = summarize_get_outcomes_per_tx(&logs);
assert_eq!(summary.timeouts, 1);
assert_eq!(summary.not_found, 1);
assert_eq!(summary.total(), 2);
}
}