#![cfg_attr(not(feature = "trace"), allow(dead_code, missing_docs))]
use super::event::{Event, TraceId};
use std::ptr;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
#[cfg(feature = "trace")]
use dashmap::DashMap;
pub struct TraceConfig;
impl TraceConfig {
pub const BUFFER_SIZE: usize = 65536;
pub const BUFFER_MASK: usize = Self::BUFFER_SIZE - 1;
}
pub struct EventLog {
events: Box<[std::cell::UnsafeCell<Event>; TraceConfig::BUFFER_SIZE]>,
write_index: AtomicU64,
sequence_counter: AtomicU32,
#[cfg(feature = "trace")]
indices: EventIndices,
}
#[cfg(feature = "trace")]
struct EventIndices {
by_trace: DashMap<TraceId, Vec<u32>>,
by_peer: DashMap<[u8; 32], Vec<u32>>,
}
const _: () = assert!(TraceConfig::BUFFER_SIZE.count_ones() == 1);
impl EventLog {
pub fn new() -> Self {
let events: Vec<std::cell::UnsafeCell<Event>> = (0..TraceConfig::BUFFER_SIZE)
.map(|_| std::cell::UnsafeCell::new(Event::default()))
.collect();
let events = events.into_boxed_slice();
let events = unsafe {
Box::from_raw(Box::into_raw(events)
as *mut [std::cell::UnsafeCell<Event>; TraceConfig::BUFFER_SIZE])
};
Self {
events,
write_index: AtomicU64::new(0),
sequence_counter: AtomicU32::new(0),
#[cfg(feature = "trace")]
indices: EventIndices {
by_trace: DashMap::new(),
by_peer: DashMap::new(),
},
}
}
pub fn log(&self, mut event: Event) {
event.sequence = self.sequence_counter.fetch_add(1, Ordering::Relaxed);
let idx = self.write_index.fetch_add(1, Ordering::Relaxed);
let slot = (idx & TraceConfig::BUFFER_MASK as u64) as usize;
unsafe {
let ptr = self.events[slot].get();
ptr::write_volatile(ptr, event.clone());
}
#[cfg(feature = "trace")]
self.update_indices(slot, &event);
}
#[cfg(feature = "trace")]
fn update_indices(&self, slot: usize, event: &Event) {
self.indices
.by_trace
.entry(event.trace_id)
.or_insert_with(Vec::new)
.push(slot as u32);
use super::event::EventData;
match &event.event_data {
EventData::HolePunchingStarted { peer, .. }
| EventData::HolePunchingSucceeded { peer, .. }
| EventData::ObservedAddressReceived {
from_peer: peer, ..
} => {
self.indices
.by_peer
.entry(*peer)
.or_insert_with(Vec::new)
.push(slot as u32);
}
_ => {}
}
}
pub fn recent_events(&self, count: usize) -> Vec<Event> {
let current_idx = self.write_index.load(Ordering::Relaxed);
let mut events = Vec::with_capacity(count.min(TraceConfig::BUFFER_SIZE));
let scan_count = count
.min(current_idx as usize)
.min(TraceConfig::BUFFER_SIZE);
for i in 0..scan_count {
let idx = current_idx.saturating_sub(i as u64 + 1);
if idx >= current_idx {
break; }
let slot = (idx & TraceConfig::BUFFER_MASK as u64) as usize;
let event = unsafe {
let ptr = self.events[slot].get();
ptr::read_volatile(ptr)
};
if event.timestamp == 0 {
break;
}
events.push(event);
}
events
}
#[cfg(feature = "trace")]
pub fn query_trace(&self, trace_id: TraceId) -> Vec<Event> {
if let Some(indices) = self.indices.by_trace.get(&trace_id) {
indices
.iter()
.map(|&slot| {
unsafe {
let ptr = self.events[slot as usize].get();
ptr::read_volatile(ptr)
}
})
.collect()
} else {
Vec::new()
}
}
#[cfg(not(feature = "trace"))]
pub(super) fn query_trace(&self, trace_id: TraceId) -> Vec<Event> {
let current_idx = self.write_index.load(Ordering::Relaxed);
let mut events = Vec::new();
let scan_count = current_idx.min(TraceConfig::BUFFER_SIZE as u64);
for i in 0..scan_count {
let idx = current_idx.saturating_sub(i + 1);
let slot = (idx & TraceConfig::BUFFER_MASK as u64) as usize;
let event = unsafe {
let ptr = self.events[slot].get();
ptr::read_volatile(ptr)
};
if event.timestamp == 0 {
break;
}
if event.trace_id == trace_id {
events.push(event);
}
}
events
}
pub(super) fn query_time_range(&self, start: u64, end: u64) -> Vec<Event> {
let current_idx = self.write_index.load(Ordering::Relaxed);
let mut events = Vec::new();
for i in 0..TraceConfig::BUFFER_SIZE {
let idx = current_idx.saturating_sub(i as u64 + 1);
let slot = (idx & TraceConfig::BUFFER_MASK as u64) as usize;
let event = unsafe {
let ptr = self.events[slot].get();
ptr::read_volatile(ptr)
};
if event.timestamp == 0 || event.timestamp < start {
break;
}
if event.timestamp <= end {
events.push(event);
}
}
events.reverse(); events
}
pub(super) fn event_count(&self) -> u64 {
self.write_index.load(Ordering::Relaxed)
}
pub fn get_events_by_trace(&self, trace_id: TraceId) -> Vec<Event> {
self.query_trace(trace_id)
}
pub(super) fn get_recent_events(&self, count: usize) -> Vec<Event> {
self.recent_events(count)
}
pub(super) fn get_events_in_range(&self, start: u64, end: u64) -> Vec<Event> {
self.query_time_range(start, end)
}
}
unsafe impl Send for EventLog {}
unsafe impl Sync for EventLog {}
#[cfg(all(test, feature = "trace"))]
mod tests {
use super::*;
use std::sync::Arc;
use std::thread;
#[test]
fn test_ring_buffer_basic() {
let log = EventLog::new();
let trace_id = TraceId::new();
for i in 0..10 {
let event = Event::packet_sent(100 + i, i as u64, trace_id);
log.log(event);
}
let recent = log.recent_events(5);
assert_eq!(recent.len(), 5);
match &recent[0].event_data {
crate::tracing::event::EventData::PacketSent { packet_num, .. } => {
assert_eq!(*packet_num, 9);
}
_ => panic!("Wrong event type"),
}
}
#[test]
fn test_concurrent_logging() {
let log = Arc::new(EventLog::new());
let mut handles = vec![];
for thread_id in 0..4 {
let log_clone = log.clone();
let handle = thread::spawn(move || {
let trace_id = TraceId::new();
for i in 0..100 {
let event = Event::packet_sent(thread_id * 1000 + i, i as u64, trace_id);
log_clone.log(event);
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
assert_eq!(log.event_count(), 400);
}
#[test]
fn test_ring_buffer_wraparound() {
let log = EventLog::new();
let trace_id = TraceId::new();
for i in 0..(TraceConfig::BUFFER_SIZE + 100) {
let event = Event::packet_sent(i as u32, i as u64, trace_id);
log.log(event);
}
let recent = log.recent_events(10);
assert_eq!(recent.len(), 10);
}
}