use std::sync::OnceLock;
use std::time::Duration;
use tokio::time::Instant;
use either::Either;
use futures::FutureExt;
use futures::future::BoxFuture;
use serde::Serialize;
use tokio::sync::mpsc;
use crate::config::GlobalRng;
use crate::config::{GlobalExecutor, TelemetryConfig};
use crate::message::Transaction;
use crate::router::RouteEvent;
use crate::transport::TRANSPORT_METRICS;
use super::{EventKind, NetEventLog, NetEventRegister, NetLogMessage};
static TELEMETRY_SENDER: OnceLock<mpsc::Sender<TelemetryCommand>> = OnceLock::new();
pub fn send_standalone_event(event_type: &str, event_data: serde_json::Value) {
if let Some(sender) = TELEMETRY_SENDER.get() {
let event = TelemetryEvent {
timestamp: current_timestamp_ms(),
peer_id: String::new(),
transaction_id: String::new(),
event_type: event_type.to_string(),
event_data,
};
#[allow(clippy::let_underscore_must_use)]
let _ = sender.try_send(TelemetryCommand::Event(event));
}
}
const MAX_BUFFER_SIZE: usize = 100;
const BATCH_INTERVAL_SECS: u64 = 10;
const MAX_EVENTS_PER_SECOND: usize = 10;
const INITIAL_BACKOFF_MS: u64 = 1000;
const MIN_TRANSPORT_SNAPSHOT_INTERVAL_SECS: u64 = 10;
const MAX_BACKOFF_MS: u64 = 300_000;
pub fn current_timestamp_ms() -> u64 {
match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) {
Ok(duration) => duration.as_millis() as u64,
Err(e) => {
tracing::warn!(
error = %e,
"System time error while generating telemetry timestamp; using 0 as fallback"
);
0
}
}
}
#[derive(Clone)]
pub struct TelemetryReporter {
sender: mpsc::Sender<TelemetryCommand>,
}
#[allow(dead_code)]
enum TelemetryCommand {
Event(TelemetryEvent),
Shutdown, }
#[derive(Debug, Clone, Serialize)]
struct TelemetryEvent {
timestamp: u64,
peer_id: String,
transaction_id: String,
event_type: String,
event_data: serde_json::Value,
}
impl TelemetryReporter {
pub fn new(config: &TelemetryConfig) -> Option<Self> {
if !config.enabled {
tracing::info!("Telemetry reporting is disabled");
return None;
}
if config.is_test_environment {
tracing::info!("Telemetry disabled in test environment (--id flag detected)");
return None;
}
let endpoint = config.endpoint.clone();
let transport_snapshot_interval_secs = config.transport_snapshot_interval_secs;
tracing::info!(
endpoint = %endpoint,
transport_snapshot_interval_secs,
"Telemetry reporting enabled"
);
let (sender, receiver) = mpsc::channel(1000);
#[allow(clippy::let_underscore_must_use)]
let _ = TELEMETRY_SENDER.set(sender.clone());
let transfer_event_receiver = crate::transport::metrics::init_transfer_event_channel();
let worker = TelemetryWorker::new(
endpoint,
receiver,
transport_snapshot_interval_secs,
transfer_event_receiver,
);
GlobalExecutor::spawn(worker.run());
Some(Self { sender })
}
async fn send_event(&self, event: TelemetryEvent) {
#[allow(clippy::let_underscore_must_use)]
let _ = self.sender.try_send(TelemetryCommand::Event(event));
}
}
impl NetEventRegister for TelemetryReporter {
fn register_events<'a>(
&'a self,
logs: Either<NetEventLog<'a>, Vec<NetEventLog<'a>>>,
) -> BoxFuture<'a, ()> {
async move {
for log_msg in NetLogMessage::to_log_message(logs) {
let event = TelemetryEvent {
timestamp: current_timestamp_ms(),
peer_id: log_msg.peer_id.to_string(),
transaction_id: log_msg.tx.to_string(),
event_type: event_kind_to_string(&log_msg.kind),
event_data: event_kind_to_json(&log_msg.kind),
};
self.send_event(event).await;
}
}
.boxed()
}
fn trait_clone(&self) -> Box<dyn NetEventRegister> {
Box::new(self.clone())
}
fn notify_of_time_out(
&mut self,
tx: Transaction,
op_type: &str,
target_peer: Option<String>,
) -> BoxFuture<'_, ()> {
let sender = self.sender.clone();
let op_type = op_type.to_string();
async move {
let event = TelemetryEvent {
timestamp: current_timestamp_ms(),
peer_id: String::new(),
transaction_id: tx.to_string(),
event_type: "timeout".to_string(),
event_data: serde_json::json!({
"transaction": tx.to_string(),
"op_type": op_type,
"target_peer": target_peer,
}),
};
#[allow(clippy::let_underscore_must_use)]
let _ = sender.try_send(TelemetryCommand::Event(event));
}
.boxed()
}
fn get_router_events(&self, _number: usize) -> BoxFuture<'_, anyhow::Result<Vec<RouteEvent>>> {
async { Ok(vec![]) }.boxed()
}
}
struct TelemetryWorker {
endpoint: String,
receiver: mpsc::Receiver<TelemetryCommand>,
buffer: Vec<TelemetryEvent>,
http_client: reqwest::Client,
backoff_ms: u64,
last_send: Instant,
events_this_second: usize,
rate_limit_window_start: Instant,
transport_snapshot_interval_secs: u64,
transfer_event_receiver: mpsc::Receiver<super::TransferEvent>,
}
impl TelemetryWorker {
fn new(
endpoint: String,
receiver: mpsc::Receiver<TelemetryCommand>,
transport_snapshot_interval_secs: u64,
transfer_event_receiver: mpsc::Receiver<super::TransferEvent>,
) -> Self {
Self {
endpoint,
receiver,
buffer: Vec::with_capacity(MAX_BUFFER_SIZE),
http_client: reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()
.expect("failed to build telemetry HTTP client; check TLS/proxy configuration"),
backoff_ms: 0,
last_send: Instant::now(),
events_this_second: 0,
rate_limit_window_start: Instant::now(),
transport_snapshot_interval_secs,
transfer_event_receiver,
}
}
async fn run(mut self) {
let mut batch_interval = tokio::time::interval(Duration::from_secs(BATCH_INTERVAL_SECS));
let snapshot_interval_secs = if self.transport_snapshot_interval_secs == 0 {
u64::MAX / 2 } else if self.transport_snapshot_interval_secs < MIN_TRANSPORT_SNAPSHOT_INTERVAL_SECS {
tracing::warn!(
"Transport snapshot interval {}s is below minimum {}s, using minimum",
self.transport_snapshot_interval_secs,
MIN_TRANSPORT_SNAPSHOT_INTERVAL_SECS
);
MIN_TRANSPORT_SNAPSHOT_INTERVAL_SECS
} else {
self.transport_snapshot_interval_secs
};
let mut transport_snapshot_interval =
tokio::time::interval(Duration::from_secs(snapshot_interval_secs));
loop {
crate::deterministic_select! {
cmd = self.receiver.recv() => {
match cmd {
Some(TelemetryCommand::Event(event)) => {
self.handle_event(event).await;
}
Some(TelemetryCommand::Shutdown) | None => {
self.flush().await;
break;
}
}
},
transfer_event = self.transfer_event_receiver.recv() => {
if let Some(transfer_event) = transfer_event {
let event_type = match &transfer_event {
super::TransferEvent::Started { .. } => "transfer_started",
super::TransferEvent::Completed { .. } => "transfer_completed",
super::TransferEvent::Failed { .. } => "transfer_failed",
};
let event = TelemetryEvent {
timestamp: current_timestamp_ms(),
peer_id: String::new(), transaction_id: String::new(), event_type: event_type.to_string(),
event_data: event_kind_to_json(&super::EventKind::Transfer(transfer_event)),
};
self.handle_event(event).await;
}
},
_ = batch_interval.tick() => {
self.flush().await;
},
_ = transport_snapshot_interval.tick() => {
if self.transport_snapshot_interval_secs > 0 {
if let Some(snapshot) = TRANSPORT_METRICS.take_snapshot() {
let event = TelemetryEvent {
timestamp: current_timestamp_ms(),
peer_id: String::new(), transaction_id: String::new(), event_type: "transport_snapshot".to_string(),
event_data: serde_json::to_value(&snapshot).unwrap_or_default(),
};
self.handle_event(event).await;
}
}
},
}
}
}
async fn handle_event(&mut self, event: TelemetryEvent) {
let now = Instant::now();
if now.duration_since(self.rate_limit_window_start) >= Duration::from_secs(1) {
self.rate_limit_window_start = now;
self.events_this_second = 0;
}
if self.events_this_second >= MAX_EVENTS_PER_SECOND {
return;
}
self.events_this_second += 1;
if self.buffer.len() >= MAX_BUFFER_SIZE && self.backoff_ms > 0 {
let elapsed = self.last_send.elapsed();
if elapsed < Duration::from_millis(self.backoff_ms) {
return;
}
}
self.buffer.push(event);
if self.buffer.len() >= MAX_BUFFER_SIZE {
self.flush().await;
}
}
async fn flush(&mut self) {
if self.buffer.is_empty() {
return;
}
if self.backoff_ms > 0 {
let elapsed = self.last_send.elapsed();
if elapsed < Duration::from_millis(self.backoff_ms) {
return;
}
}
let events = std::mem::take(&mut self.buffer);
self.buffer = Vec::with_capacity(MAX_BUFFER_SIZE);
match self.send_batch(&events).await {
Ok(()) => {
self.backoff_ms = 0;
self.last_send = Instant::now();
}
Err(e) => {
tracing::warn!(error = %e, "Failed to send telemetry batch, will retry with backoff");
let base_backoff = if self.backoff_ms == 0 {
INITIAL_BACKOFF_MS
} else {
(self.backoff_ms * 2).min(MAX_BACKOFF_MS)
};
let jitter = GlobalRng::random_range(0..=(base_backoff / 4));
self.backoff_ms = base_backoff + jitter;
self.last_send = Instant::now();
let remaining_capacity = MAX_BUFFER_SIZE.saturating_sub(self.buffer.len());
self.buffer
.extend(events.into_iter().take(remaining_capacity));
}
}
}
async fn send_batch(&self, events: &[TelemetryEvent]) -> Result<(), reqwest::Error> {
let otlp_payload = self.to_otlp_logs(events);
let url = format!("{}/v1/logs", self.endpoint);
self.http_client
.post(&url)
.header("Content-Type", "application/json")
.json(&otlp_payload)
.send()
.await?
.error_for_status()?;
Ok(())
}
fn to_otlp_logs(&self, events: &[TelemetryEvent]) -> serde_json::Value {
let log_records: Vec<serde_json::Value> = events
.iter()
.map(|e| {
let body_string = serde_json::to_string(&e.event_data).unwrap_or_else(|err| {
tracing::warn!(
error = %err,
event_type = %e.event_type,
"Failed to serialize telemetry event_data"
);
String::new()
});
serde_json::json!({
"timeUnixNano": e.timestamp * 1_000_000, "severityNumber": 9, "severityText": "INFO",
"body": {
"stringValue": body_string
},
"attributes": [
{
"key": "peer_id",
"value": {"stringValue": &e.peer_id}
},
{
"key": "transaction_id",
"value": {"stringValue": &e.transaction_id}
},
{
"key": "event_type",
"value": {"stringValue": &e.event_type}
}
]
})
})
.collect();
serde_json::json!({
"resourceLogs": [{
"resource": {
"attributes": [{
"key": "service.name",
"value": {"stringValue": "freenet-peer"}
}]
},
"scopeLogs": [{
"scope": {
"name": "freenet.telemetry"
},
"logRecords": log_records
}]
}]
})
}
}
fn event_kind_to_string(kind: &EventKind) -> String {
match kind {
EventKind::Connect(connect_event) => {
use super::ConnectEvent;
match connect_event {
ConnectEvent::StartConnection { .. } => "connect_start".to_string(),
ConnectEvent::Connected { .. } => "connect_connected".to_string(),
ConnectEvent::Finished { .. } => "connect_finished".to_string(),
ConnectEvent::RequestSent { .. } => "connect_request_sent".to_string(),
ConnectEvent::RequestReceived { .. } => "connect_request_received".to_string(),
ConnectEvent::ResponseSent { .. } => "connect_response_sent".to_string(),
ConnectEvent::ResponseReceived { .. } => "connect_response_received".to_string(),
ConnectEvent::Rejected { .. } => "connect_rejected".to_string(),
}
}
EventKind::Disconnected { .. } => "disconnect".to_string(),
EventKind::Put(put_event) => {
use super::PutEvent;
match put_event {
PutEvent::Request { .. } => "put_request".to_string(),
PutEvent::PutSuccess { .. } => "put_success".to_string(),
PutEvent::PutFailure { .. } => "put_failure".to_string(),
PutEvent::ResponseSent { .. } => "put_response_sent".to_string(),
PutEvent::BroadcastEmitted { .. } => "put_broadcast_emitted".to_string(),
PutEvent::BroadcastReceived { .. } => "put_broadcast_received".to_string(),
}
}
EventKind::Get(get_event) => {
use super::GetEvent;
match get_event {
GetEvent::Request { .. } => "get_request".to_string(),
GetEvent::GetSuccess { .. } => "get_success".to_string(),
GetEvent::GetNotFound { .. } => "get_not_found".to_string(),
GetEvent::GetFailure { .. } => "get_failure".to_string(),
GetEvent::ResponseSent { .. } => "get_response_sent".to_string(),
GetEvent::ForwardingAckSent { .. } => "get_forwarding_ack_sent".to_string(),
GetEvent::ForwardingAckReceived { .. } => "get_forwarding_ack_received".to_string(),
}
}
EventKind::Subscribe(subscribe_event) => {
use super::SubscribeEvent;
match subscribe_event {
SubscribeEvent::Request { .. } => "subscribe_request".to_string(),
SubscribeEvent::SubscribeSuccess { .. } => "subscribe_success".to_string(),
SubscribeEvent::SubscribeNotFound { .. } => "subscribe_not_found".to_string(),
SubscribeEvent::ResponseSent { .. } => "subscribe_response_sent".to_string(),
SubscribeEvent::HostingStarted { .. } => "hosting_started".to_string(),
SubscribeEvent::HostingStopped { .. } => "hosting_stopped".to_string(),
SubscribeEvent::_Reserved6
| SubscribeEvent::_Reserved7
| SubscribeEvent::_Reserved8
| SubscribeEvent::_Reserved9
| SubscribeEvent::_Reserved10 => "subscribe_reserved".to_string(),
SubscribeEvent::UnsubscribeSent { .. } => "unsubscribe_sent".to_string(),
SubscribeEvent::UnsubscribeReceived { .. } => "unsubscribe_received".to_string(),
}
}
EventKind::Update(update_event) => {
use super::UpdateEvent;
match update_event {
UpdateEvent::Request { .. } => "update_request".to_string(),
UpdateEvent::UpdateSuccess { .. } => "update_success".to_string(),
UpdateEvent::BroadcastEmitted { .. } => "update_broadcast_emitted".to_string(),
UpdateEvent::BroadcastComplete { .. } => "update_broadcast_complete".to_string(),
UpdateEvent::BroadcastReceived { .. } => "update_broadcast_received".to_string(),
UpdateEvent::BroadcastApplied { .. } => "update_broadcast_applied".to_string(),
UpdateEvent::UpdateFailure { .. } => "update_failure".to_string(),
UpdateEvent::BroadcastDeliverySummary { .. } => {
"update_broadcast_delivery_summary".to_string()
}
}
}
EventKind::Transfer(transfer_event) => {
use super::TransferEvent;
match transfer_event {
TransferEvent::Started { .. } => "transfer_started".to_string(),
TransferEvent::Completed { .. } => "transfer_completed".to_string(),
TransferEvent::Failed { .. } => "transfer_failed".to_string(),
}
}
EventKind::Route(route_event) => {
use crate::router::RouteOutcome;
match &route_event.outcome {
RouteOutcome::Success { .. } => "route_success".to_string(),
RouteOutcome::SuccessUntimed => "route_success_untimed".to_string(),
RouteOutcome::Failure => "route_failure".to_string(),
}
}
EventKind::Ignored => "ignored".to_string(),
EventKind::Timeout { .. } => "timeout".to_string(),
EventKind::Lifecycle(lifecycle_event) => {
use super::PeerLifecycleEvent;
match lifecycle_event {
PeerLifecycleEvent::Startup { .. } => "peer_startup".to_string(),
PeerLifecycleEvent::Shutdown { .. } => "peer_shutdown".to_string(),
}
}
EventKind::TransportSnapshot(_) => "transport_snapshot".to_string(),
EventKind::InterestSync(interest_sync_event) => {
use crate::tracing::InterestSyncEvent;
match interest_sync_event {
InterestSyncEvent::ResyncRequestReceived { .. } => {
"interest_resync_request_received".to_string()
}
InterestSyncEvent::ResyncResponseSent { .. } => {
"interest_resync_response_sent".to_string()
}
InterestSyncEvent::StateConfirmed { .. } => "interest_state_confirmed".to_string(),
}
}
EventKind::RoutingDecision(_) => "routing_decision".to_string(),
EventKind::RouterSnapshot(_) => "router_snapshot".to_string(),
}
}
fn event_kind_to_json(kind: &EventKind) -> serde_json::Value {
match kind {
EventKind::Connect(connect_event) => {
use super::ConnectEvent;
match connect_event {
ConnectEvent::StartConnection { from, is_gateway } => {
serde_json::json!({
"type": "start_connection",
"from": from.to_string(),
"is_gateway": is_gateway,
})
}
ConnectEvent::Connected {
this,
connected,
elapsed_ms,
connection_type,
latency_ms,
this_peer_connection_count,
initiated_by,
} => {
serde_json::json!({
"type": "connected",
"this_peer": this.to_string(),
"this_peer_id": this.pub_key().to_string(),
"this_peer_addr": this.peer_addr.to_string(),
"connected_peer": connected.to_string(),
"connected_peer_id": connected.pub_key().to_string(),
"connected_peer_addr": connected.peer_addr.to_string(),
"elapsed_ms": elapsed_ms,
"connection_type": connection_type.to_string(),
"latency_ms": latency_ms,
"connection_count": this_peer_connection_count,
"initiated_by": initiated_by.as_ref().map(|p| p.to_string()),
})
}
ConnectEvent::Finished {
initiator,
location,
elapsed_ms,
} => {
serde_json::json!({
"type": "finished",
"initiator": initiator.to_string(),
"location": location.as_f64(),
"elapsed_ms": elapsed_ms,
})
}
ConnectEvent::RequestSent {
desired_location,
joiner,
target,
ttl,
is_initial,
} => {
serde_json::json!({
"type": "request_sent",
"desired_location": desired_location.as_f64(),
"joiner": joiner.to_string(),
"target": target.to_string(),
"ttl": ttl,
"is_initial": is_initial,
})
}
ConnectEvent::RequestReceived {
desired_location,
joiner,
from_addr,
from_peer,
forwarded_to,
accepted,
ttl,
} => {
serde_json::json!({
"type": "request_received",
"desired_location": desired_location.as_f64(),
"joiner": joiner.to_string(),
"from_addr": from_addr.to_string(),
"from_peer": from_peer.as_ref().map(|p| p.to_string()),
"forwarded_to": forwarded_to.as_ref().map(|p| p.to_string()),
"accepted": accepted,
"ttl": ttl,
})
}
ConnectEvent::ResponseSent { acceptor, joiner } => {
serde_json::json!({
"type": "response_sent",
"acceptor": acceptor.to_string(),
"joiner": joiner.to_string(),
})
}
ConnectEvent::ResponseReceived {
acceptor,
elapsed_ms,
} => {
serde_json::json!({
"type": "response_received",
"acceptor": acceptor.to_string(),
"elapsed_ms": elapsed_ms,
})
}
ConnectEvent::Rejected {
desired_location,
reason,
} => {
serde_json::json!({
"type": "rejected",
"desired_location": desired_location.as_f64(),
"reason": reason,
})
}
}
}
EventKind::Disconnected {
from,
reason,
connection_duration_ms,
bytes_sent,
bytes_received,
} => {
serde_json::json!({
"type": "disconnected",
"from": from.to_string(),
"from_peer_id": from.pub_key().to_string(),
"from_peer_addr": from.socket_addr().to_string(),
"reason": reason.to_string(),
"connection_duration_ms": connection_duration_ms,
"bytes_sent": bytes_sent,
"bytes_received": bytes_received,
})
}
EventKind::Put(put_event) => {
use super::PutEvent;
match put_event {
PutEvent::Request {
requester,
target,
key,
id,
htl,
timestamp,
} => {
serde_json::json!({
"type": "request",
"requester": requester.to_string(),
"target": target.to_string(),
"key": key.to_string(),
"id": id.to_string(),
"htl": htl,
"timestamp": timestamp,
})
}
PutEvent::PutSuccess {
requester,
target,
key,
id,
hop_count,
elapsed_ms,
timestamp,
state_hash,
state_size,
} => {
let mut json = serde_json::json!({
"type": "success",
"requester": requester.to_string(),
"target": target.to_string(),
"key": key.to_string(),
"id": id.to_string(),
"hop_count": hop_count,
"elapsed_ms": elapsed_ms,
"timestamp": timestamp,
});
if let Some(hash) = state_hash {
json["state_hash"] = serde_json::Value::String(hash.clone());
}
if let Some(size) = state_size {
json["state_size"] = serde_json::json!(size);
}
json
}
PutEvent::BroadcastEmitted {
upstream,
broadcast_to,
broadcasted_to,
key,
sender,
id,
timestamp,
..
} => {
serde_json::json!({
"type": "broadcast_emitted",
"upstream": upstream.to_string(),
"broadcast_to": broadcast_to.iter().map(|p| p.to_string()).collect::<Vec<_>>(),
"broadcasted_to": broadcasted_to,
"key": key.to_string(),
"sender": sender.to_string(),
"id": id.to_string(),
"timestamp": timestamp,
})
}
PutEvent::BroadcastReceived {
requester,
target,
key,
id,
timestamp,
..
} => {
serde_json::json!({
"type": "broadcast_received",
"requester": requester.to_string(),
"target": target.to_string(),
"key": key.to_string(),
"id": id.to_string(),
"timestamp": timestamp,
})
}
PutEvent::PutFailure {
id,
requester,
target,
key,
hop_count,
reason,
elapsed_ms,
timestamp,
} => {
serde_json::json!({
"type": "failure",
"id": id.to_string(),
"requester": requester.to_string(),
"target": target.to_string(),
"key": key.to_string(),
"hop_count": hop_count,
"reason": reason.to_string(),
"elapsed_ms": elapsed_ms,
"timestamp": timestamp,
})
}
PutEvent::ResponseSent {
id,
from,
to,
key,
timestamp,
} => {
serde_json::json!({
"type": "response_sent",
"id": id.to_string(),
"from": from.to_string(),
"to": to.to_string(),
"key": key.to_string(),
"timestamp": timestamp,
})
}
}
}
EventKind::Get(get_event) => {
use super::GetEvent;
match get_event {
GetEvent::Request {
id,
requester,
instance_id,
target,
htl,
timestamp,
} => {
serde_json::json!({
"type": "get_request",
"id": id.to_string(),
"requester": requester.to_string(),
"instance_id": instance_id.to_string(),
"target": target.to_string(),
"htl": htl,
"timestamp": timestamp,
})
}
GetEvent::GetSuccess {
id,
requester,
target,
key,
hop_count,
elapsed_ms,
timestamp,
state_hash,
} => {
let mut json = serde_json::json!({
"type": "get_success",
"id": id.to_string(),
"requester": requester.to_string(),
"target": target.to_string(),
"key": key.to_string(),
"hop_count": hop_count,
"elapsed_ms": elapsed_ms,
"timestamp": timestamp,
});
if let Some(hash) = state_hash {
json["state_hash"] = serde_json::Value::String(hash.clone());
}
json
}
GetEvent::GetNotFound {
id,
requester,
instance_id,
target,
hop_count,
elapsed_ms,
timestamp,
} => {
serde_json::json!({
"type": "get_not_found",
"id": id.to_string(),
"requester": requester.to_string(),
"instance_id": instance_id.to_string(),
"target": target.to_string(),
"hop_count": hop_count,
"elapsed_ms": elapsed_ms,
"timestamp": timestamp,
})
}
GetEvent::GetFailure {
id,
requester,
instance_id,
target,
hop_count,
reason,
elapsed_ms,
timestamp,
} => {
serde_json::json!({
"type": "get_failure",
"id": id.to_string(),
"requester": requester.to_string(),
"instance_id": instance_id.to_string(),
"target": target.to_string(),
"hop_count": hop_count,
"reason": reason.to_string(),
"elapsed_ms": elapsed_ms,
"timestamp": timestamp,
})
}
GetEvent::ResponseSent {
id,
from,
to,
key,
timestamp,
} => {
let mut json = serde_json::json!({
"type": "get_response_sent",
"id": id.to_string(),
"from": from.to_string(),
"to": to.to_string(),
"timestamp": timestamp,
});
if let Some(k) = key {
json["key"] = serde_json::Value::String(k.to_string());
}
json
}
GetEvent::ForwardingAckSent {
id,
from,
to,
instance_id,
timestamp,
} => {
serde_json::json!({
"type": "get_forwarding_ack_sent",
"id": id.to_string(),
"from": from.to_string(),
"to": to.to_string(),
"instance_id": instance_id.to_string(),
"timestamp": timestamp,
})
}
GetEvent::ForwardingAckReceived {
id,
receiver,
instance_id,
elapsed_ms,
timestamp,
} => {
serde_json::json!({
"type": "get_forwarding_ack_received",
"id": id.to_string(),
"receiver": receiver.to_string(),
"instance_id": instance_id.to_string(),
"elapsed_ms": elapsed_ms,
"timestamp": timestamp,
})
}
}
}
EventKind::Subscribe(subscribe_event) => {
use super::SubscribeEvent;
match subscribe_event {
SubscribeEvent::Request {
id,
requester,
instance_id,
target,
htl,
timestamp,
} => {
serde_json::json!({
"type": "subscribe_request",
"id": id.to_string(),
"requester": requester.to_string(),
"instance_id": instance_id.to_string(),
"target": target.to_string(),
"htl": htl,
"timestamp": timestamp,
})
}
SubscribeEvent::SubscribeSuccess {
id,
key,
at,
hop_count,
elapsed_ms,
timestamp,
requester,
} => {
serde_json::json!({
"type": "subscribe_success",
"id": id.to_string(),
"key": key.to_string(),
"at": at.to_string(),
"hop_count": hop_count,
"elapsed_ms": elapsed_ms,
"timestamp": timestamp,
"requester": requester.to_string(),
})
}
SubscribeEvent::SubscribeNotFound {
id,
requester,
instance_id,
target,
hop_count,
elapsed_ms,
timestamp,
} => {
serde_json::json!({
"type": "subscribe_not_found",
"id": id.to_string(),
"requester": requester.to_string(),
"instance_id": instance_id.to_string(),
"target": target.to_string(),
"hop_count": hop_count,
"elapsed_ms": elapsed_ms,
"timestamp": timestamp,
})
}
SubscribeEvent::ResponseSent {
id,
from,
to,
key,
timestamp,
} => {
let mut json = serde_json::json!({
"type": "subscribe_response_sent",
"id": id.to_string(),
"from": from.to_string(),
"to": to.to_string(),
"timestamp": timestamp,
});
if let Some(k) = key {
json["key"] = serde_json::Value::String(k.to_string());
}
json
}
SubscribeEvent::HostingStarted {
instance_id,
timestamp,
} => {
serde_json::json!({
"type": "hosting_started",
"instance_id": instance_id.to_string(),
"timestamp": timestamp,
})
}
SubscribeEvent::HostingStopped {
instance_id,
reason,
timestamp,
} => {
serde_json::json!({
"type": "hosting_stopped",
"instance_id": instance_id.to_string(),
"reason": format!("{:?}", reason),
"timestamp": timestamp,
})
}
SubscribeEvent::_Reserved6
| SubscribeEvent::_Reserved7
| SubscribeEvent::_Reserved8
| SubscribeEvent::_Reserved9
| SubscribeEvent::_Reserved10 => serde_json::json!({"type": "reserved"}),
SubscribeEvent::UnsubscribeSent {
id,
instance_id,
from,
to,
timestamp,
} => {
serde_json::json!({
"type": "unsubscribe_sent",
"id": id.to_string(),
"instance_id": instance_id.to_string(),
"from": from.to_string(),
"to": to.to_string(),
"timestamp": timestamp,
})
}
SubscribeEvent::UnsubscribeReceived {
id,
instance_id,
from,
at,
timestamp,
} => {
serde_json::json!({
"type": "unsubscribe_received",
"id": id.to_string(),
"instance_id": instance_id.to_string(),
"from": from.to_string(),
"at": at.to_string(),
"timestamp": timestamp,
})
}
}
}
EventKind::Update(update_event) => {
use super::UpdateEvent;
match update_event {
UpdateEvent::Request {
requester,
target,
key,
id,
timestamp,
} => {
serde_json::json!({
"type": "request",
"requester": requester.to_string(),
"target": target.to_string(),
"key": key.to_string(),
"id": id.to_string(),
"timestamp": timestamp,
})
}
UpdateEvent::UpdateSuccess {
requester,
target,
key,
id,
timestamp,
state_hash_before,
state_hash_after,
state_size,
} => {
let mut json = serde_json::json!({
"type": "success",
"requester": requester.to_string(),
"target": target.to_string(),
"key": key.to_string(),
"id": id.to_string(),
"timestamp": timestamp,
});
if let Some(hash) = state_hash_before {
json["state_hash_before"] = serde_json::Value::String(hash.clone());
}
if let Some(hash) = state_hash_after {
json["state_hash_after"] = serde_json::Value::String(hash.clone());
}
if let Some(size) = state_size {
json["state_size"] = serde_json::json!(size);
}
json
}
UpdateEvent::BroadcastEmitted {
upstream,
broadcast_to,
broadcasted_to,
key,
sender,
id,
timestamp,
..
} => {
serde_json::json!({
"type": "broadcast_emitted",
"upstream": upstream.to_string(),
"broadcast_to": broadcast_to.iter().map(|p| p.to_string()).collect::<Vec<_>>(),
"broadcasted_to": broadcasted_to,
"key": key.to_string(),
"sender": sender.to_string(),
"id": id.to_string(),
"timestamp": timestamp,
})
}
UpdateEvent::BroadcastComplete {
id,
key,
delta_sends,
full_state_sends,
bytes_saved,
state_size,
timestamp,
} => {
serde_json::json!({
"type": "broadcast_complete",
"id": id.to_string(),
"key": key.to_string(),
"delta_sends": delta_sends,
"full_state_sends": full_state_sends,
"bytes_saved": bytes_saved,
"state_size": state_size,
"delta_ratio": if *delta_sends + *full_state_sends > 0 {
*delta_sends as f64 / (*delta_sends + *full_state_sends) as f64
} else {
0.0
},
"timestamp": timestamp,
})
}
UpdateEvent::BroadcastReceived {
requester,
target,
key,
id,
timestamp,
..
} => {
serde_json::json!({
"type": "broadcast_received",
"requester": requester.to_string(),
"target": target.to_string(),
"key": key.to_string(),
"id": id.to_string(),
"timestamp": timestamp,
})
}
UpdateEvent::BroadcastApplied {
id,
key,
target,
timestamp,
state_hash_before,
state_hash_after,
changed,
state_size,
} => {
let mut json = serde_json::json!({
"type": "broadcast_applied",
"id": id.to_string(),
"key": key.to_string(),
"target": target.to_string(),
"timestamp": timestamp,
"changed": changed,
"state_size": state_size,
});
if let Some(hash) = state_hash_before {
json["state_hash_before"] = serde_json::Value::String(hash.clone());
}
if let Some(hash) = state_hash_after {
json["state_hash_after"] = serde_json::Value::String(hash.clone());
}
json
}
UpdateEvent::BroadcastDeliverySummary {
key,
proximity_found,
proximity_resolve_failed,
interest_found,
interest_resolve_failed,
skipped_self,
skipped_sender,
skipped_summary_match,
targets_sent,
send_failed,
timestamp,
} => {
serde_json::json!({
"type": "broadcast_delivery_summary",
"key": key.to_string(),
"proximity_found": proximity_found,
"proximity_resolve_failed": proximity_resolve_failed,
"interest_found": interest_found,
"interest_resolve_failed": interest_resolve_failed,
"skipped_self": skipped_self,
"skipped_sender": skipped_sender,
"skipped_summary_match": skipped_summary_match,
"targets_sent": targets_sent,
"send_failed": send_failed,
"timestamp": timestamp,
})
}
UpdateEvent::UpdateFailure {
id,
requester,
target,
key,
reason,
elapsed_ms,
timestamp,
} => {
serde_json::json!({
"type": "update_failure",
"id": id.to_string(),
"requester": requester.to_string(),
"target": target.to_string(),
"key": key.to_string(),
"reason": reason.to_string(),
"elapsed_ms": elapsed_ms,
"timestamp": timestamp,
})
}
}
}
EventKind::Transfer(transfer_event) => {
use super::TransferEvent;
match transfer_event {
TransferEvent::Started {
stream_id,
peer_addr,
expected_bytes,
direction,
tx_id,
timestamp,
} => {
serde_json::json!({
"type": "started",
"stream_id": stream_id,
"peer_addr": peer_addr.to_string(),
"expected_bytes": expected_bytes,
"direction": format!("{:?}", direction),
"tx_id": tx_id.as_ref().map(|t| t.to_string()),
"timestamp": timestamp,
})
}
TransferEvent::Completed {
stream_id,
peer_addr,
bytes_transferred,
elapsed_ms,
avg_throughput_bps,
peak_cwnd_bytes,
final_cwnd_bytes,
slowdowns_triggered,
final_srtt_ms,
final_ssthresh_bytes,
min_ssthresh_floor_bytes,
total_timeouts,
direction,
timestamp,
} => {
serde_json::json!({
"type": "completed",
"stream_id": stream_id,
"peer_addr": peer_addr.to_string(),
"bytes_transferred": bytes_transferred,
"elapsed_ms": elapsed_ms,
"avg_throughput_bps": avg_throughput_bps,
"peak_cwnd_bytes": peak_cwnd_bytes,
"final_cwnd_bytes": final_cwnd_bytes,
"slowdowns_triggered": slowdowns_triggered,
"final_srtt_ms": final_srtt_ms,
"final_ssthresh_bytes": final_ssthresh_bytes,
"min_ssthresh_floor_bytes": min_ssthresh_floor_bytes,
"total_timeouts": total_timeouts,
"direction": format!("{:?}", direction),
"timestamp": timestamp,
})
}
TransferEvent::Failed {
stream_id,
peer_addr,
bytes_transferred,
reason,
elapsed_ms,
direction,
timestamp,
} => {
serde_json::json!({
"type": "failed",
"stream_id": stream_id,
"peer_addr": peer_addr.to_string(),
"bytes_transferred": bytes_transferred,
"reason": reason,
"elapsed_ms": elapsed_ms,
"direction": format!("{:?}", direction),
"timestamp": timestamp,
})
}
}
}
EventKind::Route(route_event) => {
use crate::router::RouteOutcome;
let distance = route_event
.peer
.location()
.map(|l| route_event.contract_location.distance(l).as_f64());
match &route_event.outcome {
RouteOutcome::Success {
time_to_response_start,
payload_size,
payload_transfer_time,
} => {
serde_json::json!({
"type": "route_success",
"peer_location": route_event.peer.location().map(|l| l.as_f64()),
"contract_location": route_event.contract_location.as_f64(),
"distance": distance,
"time_to_response_start_ms": time_to_response_start.as_millis() as u64,
"payload_size": payload_size,
"payload_transfer_time_ms": payload_transfer_time.as_millis() as u64,
})
}
RouteOutcome::SuccessUntimed | RouteOutcome::Failure => {
let type_str = match &route_event.outcome {
RouteOutcome::SuccessUntimed => "route_success_untimed",
RouteOutcome::Success { .. } | RouteOutcome::Failure => "route_failure",
};
serde_json::json!({
"type": type_str,
"peer_location": route_event.peer.location().map(|l| l.as_f64()),
"contract_location": route_event.contract_location.as_f64(),
"distance": distance,
})
}
}
}
EventKind::Ignored => {
serde_json::json!({"type": "ignored"})
}
EventKind::Timeout {
id,
timestamp,
op_type,
target_peer,
} => {
serde_json::json!({
"type": "timeout",
"id": id.to_string(),
"timestamp": timestamp,
"op_type": op_type,
"target_peer": target_peer,
})
}
EventKind::Lifecycle(lifecycle_event) => {
use super::PeerLifecycleEvent;
match lifecycle_event {
PeerLifecycleEvent::Startup {
version,
git_commit,
git_dirty,
arch,
os,
os_version,
is_gateway,
timestamp,
} => {
serde_json::json!({
"type": "startup",
"version": version,
"git_commit": git_commit,
"git_dirty": git_dirty,
"arch": arch,
"os": os,
"os_version": os_version,
"is_gateway": is_gateway,
"timestamp": timestamp,
})
}
PeerLifecycleEvent::Shutdown {
graceful,
reason,
uptime_secs,
total_connections,
timestamp,
} => {
serde_json::json!({
"type": "shutdown",
"graceful": graceful,
"reason": reason,
"uptime_secs": uptime_secs,
"total_connections": total_connections,
"timestamp": timestamp,
})
}
}
}
EventKind::TransportSnapshot(snapshot) => {
serde_json::json!({
"type": "transport_snapshot",
"transfers_completed": snapshot.transfers_completed,
"transfers_failed": snapshot.transfers_failed,
"bytes_sent": snapshot.bytes_sent,
"bytes_received": snapshot.bytes_received,
"avg_transfer_time_ms": snapshot.avg_transfer_time_ms,
"peak_throughput_bps": snapshot.peak_throughput_bps,
"avg_cwnd_bytes": snapshot.avg_cwnd_bytes,
"peak_cwnd_bytes": snapshot.peak_cwnd_bytes,
"min_cwnd_bytes": snapshot.min_cwnd_bytes,
"slowdowns_triggered": snapshot.slowdowns_triggered,
"avg_rtt_us": snapshot.avg_rtt_us,
"min_rtt_us": snapshot.min_rtt_us,
"max_rtt_us": snapshot.max_rtt_us,
})
}
EventKind::InterestSync(interest_sync_event) => {
use crate::tracing::InterestSyncEvent;
match interest_sync_event {
InterestSyncEvent::ResyncRequestReceived {
key,
from_peer,
timestamp,
} => {
serde_json::json!({
"type": "resync_request_received",
"contract_key": key.to_string(),
"contract_id": key.id().to_string(),
"from_peer": from_peer.to_string(),
"from_peer_addr": from_peer.peer_addr.to_string(),
"timestamp": timestamp,
})
}
InterestSyncEvent::ResyncResponseSent {
key,
to_peer,
state_size,
timestamp,
} => {
serde_json::json!({
"type": "resync_response_sent",
"contract_key": key.to_string(),
"contract_id": key.id().to_string(),
"to_peer": to_peer.to_string(),
"to_peer_addr": to_peer.peer_addr.to_string(),
"state_size": state_size,
"timestamp": timestamp,
})
}
InterestSyncEvent::StateConfirmed { key, state_hash } => {
serde_json::json!({
"type": "state_confirmed",
"contract_key": key.to_string(),
"contract_id": key.id().to_string(),
"state_hash": state_hash,
})
}
}
}
EventKind::RoutingDecision(decision) => {
serde_json::json!({
"type": "routing_decision",
"target_location": decision.target_location,
"strategy": format!("{:?}", decision.strategy),
"num_candidates": decision.candidates.len(),
"total_routing_events": decision.total_routing_events,
"candidates": decision.candidates.iter().map(|c| {
serde_json::json!({
"distance": c.distance,
"selected": c.selected,
"failure_probability": c.prediction.as_ref().map(|p| p.failure_probability),
"time_to_response_start": c.prediction.as_ref().map(|p| p.time_to_response_start),
"expected_total_time": c.prediction.as_ref().map(|p| p.expected_total_time),
"transfer_speed_bps": c.prediction.as_ref().map(|p| p.transfer_speed_bps),
})
}).collect::<Vec<_>>(),
})
}
EventKind::RouterSnapshot(snapshot) => {
serde_json::json!({
"type": "router_snapshot",
"failure_events": snapshot.failure_events,
"success_events": snapshot.success_events,
"transfer_rate_events": snapshot.transfer_rate_events,
"prediction_active": snapshot.prediction_active,
"mean_transfer_size_bytes": snapshot.mean_transfer_size_bytes,
"consider_n_closest_peers": snapshot.consider_n_closest_peers,
"peers_with_failure_adjustments": snapshot.peers_with_failure_adjustments,
"peers_with_response_adjustments": snapshot.peers_with_response_adjustments,
"failure_curve": snapshot.failure_curve,
"response_time_curve": snapshot.response_time_curve,
"transfer_rate_curve": snapshot.transfer_rate_curve,
"connect_forward_curve": snapshot.connect_forward_curve,
"connect_forward_events": snapshot.connect_forward_events,
"connect_forward_peer_adjustments": snapshot.connect_forward_peer_adjustments,
})
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_backoff_calculation() {
let mut backoff_ms: u64 = 0;
backoff_ms = if backoff_ms == 0 {
INITIAL_BACKOFF_MS
} else {
(backoff_ms * 2).min(MAX_BACKOFF_MS)
};
assert_eq!(backoff_ms, 1000);
backoff_ms = (backoff_ms * 2).min(MAX_BACKOFF_MS);
assert_eq!(backoff_ms, 2000);
backoff_ms = (backoff_ms * 2).min(MAX_BACKOFF_MS);
assert_eq!(backoff_ms, 4000);
backoff_ms = MAX_BACKOFF_MS + 1000;
backoff_ms = backoff_ms.min(MAX_BACKOFF_MS);
assert_eq!(backoff_ms, MAX_BACKOFF_MS);
}
#[test]
fn test_event_kind_to_string() {
let ignored = EventKind::Ignored;
assert_eq!(event_kind_to_string(&ignored), "ignored");
}
#[test]
fn test_event_kind_to_string_interest_sync() {
use crate::ring::PeerKeyLocation;
use crate::tracing::InterestSyncEvent;
use freenet_stdlib::prelude::{ContractCode, ContractKey, Parameters};
let code = ContractCode::from(vec![1, 2, 3, 4]);
let params = Parameters::from(vec![5, 6, 7, 8]);
let key = ContractKey::from_params_and_code(¶ms, &code);
let peer = PeerKeyLocation::random();
let event = EventKind::InterestSync(InterestSyncEvent::ResyncRequestReceived {
key,
from_peer: peer.clone(),
timestamp: 12345,
});
assert_eq!(
event_kind_to_string(&event),
"interest_resync_request_received"
);
let event = EventKind::InterestSync(InterestSyncEvent::ResyncResponseSent {
key,
to_peer: peer,
state_size: 1024,
timestamp: 12345,
});
assert_eq!(
event_kind_to_string(&event),
"interest_resync_response_sent"
);
}
#[test]
fn test_event_kind_to_json_interest_sync() {
use crate::ring::PeerKeyLocation;
use crate::tracing::InterestSyncEvent;
use freenet_stdlib::prelude::{ContractCode, ContractKey, Parameters};
let code = ContractCode::from(vec![1, 2, 3, 4]);
let params = Parameters::from(vec![5, 6, 7, 8]);
let key = ContractKey::from_params_and_code(¶ms, &code);
let peer = PeerKeyLocation::random();
let event = EventKind::InterestSync(InterestSyncEvent::ResyncRequestReceived {
key,
from_peer: peer.clone(),
timestamp: 12345,
});
let json = event_kind_to_json(&event);
assert_eq!(json["type"], "resync_request_received");
assert!(json["contract_key"].is_string());
assert!(json["contract_id"].is_string());
assert!(json["from_peer"].is_string());
assert_eq!(json["timestamp"], 12345);
let event = EventKind::InterestSync(InterestSyncEvent::ResyncResponseSent {
key,
to_peer: peer,
state_size: 1024,
timestamp: 67890,
});
let json = event_kind_to_json(&event);
assert_eq!(json["type"], "resync_response_sent");
assert!(json["contract_key"].is_string());
assert!(json["contract_id"].is_string());
assert!(json["to_peer"].is_string());
assert_eq!(json["state_size"], 1024);
assert_eq!(json["timestamp"], 67890);
}
#[test]
fn test_event_kind_to_json_timeout_enriched() {
use crate::message::Transaction;
let tx = Transaction::new::<crate::operations::connect::ConnectMsg>();
let event = EventKind::Timeout {
id: tx,
timestamp: 99999,
op_type: "get".to_string(),
target_peer: Some("peer-abc-123".to_string()),
};
let json = event_kind_to_json(&event);
assert_eq!(json["type"], "timeout");
assert_eq!(json["timestamp"], 99999);
assert_eq!(json["op_type"], "get");
assert_eq!(json["target_peer"], "peer-abc-123");
let event_no_peer = EventKind::Timeout {
id: tx,
timestamp: 88888,
op_type: "put".to_string(),
target_peer: None,
};
let json2 = event_kind_to_json(&event_no_peer);
assert_eq!(json2["op_type"], "put");
assert!(json2["target_peer"].is_null());
}
#[test]
fn test_send_standalone_event_no_panic_without_init() {
send_standalone_event(
"subscription_renewal_outcome",
serde_json::json!({
"contract": "test-contract",
"outcome": "success",
"error": null,
}),
);
}
#[test]
fn test_event_kind_to_json_connect_rejected() {
use crate::ring::Location;
use crate::tracing::ConnectEvent;
let loc = Location::new(0.42);
let event = EventKind::Connect(ConnectEvent::Rejected {
desired_location: loc,
reason: "ring full".to_string(),
});
assert_eq!(event_kind_to_string(&event), "connect_rejected");
let json = event_kind_to_json(&event);
assert_eq!(json["type"], "rejected");
assert!((json["desired_location"].as_f64().unwrap() - 0.42).abs() < 1e-6);
assert_eq!(json["reason"], "ring full");
}
#[test]
fn test_event_kind_to_json_update_failure() {
use crate::message::Transaction;
use crate::ring::PeerKeyLocation;
use crate::tracing::{OperationFailure, UpdateEvent};
use freenet_stdlib::prelude::{ContractCode, ContractKey, Parameters};
let tx = Transaction::new::<crate::operations::update::UpdateMsg>();
let requester = PeerKeyLocation::random();
let target = PeerKeyLocation::random();
let code = ContractCode::from(vec![10, 20, 30]);
let params = Parameters::from(vec![40, 50, 60]);
let key = ContractKey::from_params_and_code(¶ms, &code);
let event = EventKind::Update(UpdateEvent::UpdateFailure {
id: tx,
requester: requester.clone(),
target: target.clone(),
key,
reason: OperationFailure::HtlExhausted,
elapsed_ms: 1234,
timestamp: 99999,
});
assert_eq!(event_kind_to_string(&event), "update_failure");
let json = event_kind_to_json(&event);
assert_eq!(json["type"], "update_failure");
assert_eq!(json["id"], tx.to_string());
assert_eq!(json["requester"], requester.to_string());
assert_eq!(json["target"], target.to_string());
assert!(json["key"].is_string());
assert_eq!(json["reason"], "htl_exhausted");
assert_eq!(json["elapsed_ms"], 1234);
assert_eq!(json["timestamp"], 99999);
}
}