use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use parking_lot::Mutex;
use super::guard::SubgraphRequestGuard;
#[derive(Debug, Clone, Copy)]
pub(crate) struct OverheadResult {
pub(crate) overhead: Duration,
pub(crate) active_subgraph_requests: u64,
}
#[derive(Debug, Clone)]
pub(crate) struct RouterOverheadTracker {
request_start: Instant,
inner: Arc<Mutex<TrackerInner>>,
}
#[derive(Debug)]
pub(in crate::plugins::telemetry) struct TrackerInner {
pub(in crate::plugins::telemetry) accumulated_subgraph_time: Duration,
pub(in crate::plugins::telemetry) current_period_start: Option<Instant>,
pub(in crate::plugins::telemetry) active_count: u64,
}
impl Default for RouterOverheadTracker {
fn default() -> Self {
Self {
request_start: Instant::now(),
inner: Arc::new(Mutex::new(TrackerInner {
accumulated_subgraph_time: Duration::ZERO,
current_period_start: None,
active_count: 0,
})),
}
}
}
impl RouterOverheadTracker {
pub(crate) fn new() -> Self {
Self::default()
}
pub(crate) fn create_guard(&self) -> SubgraphRequestGuard {
SubgraphRequestGuard::new(self.inner.clone())
}
pub(crate) fn calculate_overhead(&self) -> OverheadResult {
let total_elapsed = self.request_start.elapsed();
let inner = self.inner.lock();
let active_count = inner.active_count;
let accumulated_time = if active_count > 0 {
if let Some(period_start) = inner.current_period_start {
inner.accumulated_subgraph_time + period_start.elapsed()
} else {
inner.accumulated_subgraph_time
}
} else {
inner.accumulated_subgraph_time
};
let overhead = total_elapsed.saturating_sub(accumulated_time);
OverheadResult {
overhead,
active_subgraph_requests: active_count,
}
}
pub(crate) fn total_duration(&self) -> Duration {
self.request_start.elapsed()
}
}
#[cfg(test)]
mod tests {
use std::thread;
use super::*;
#[test]
fn test_single_subgraph_request() {
let tracker = RouterOverheadTracker::new();
{
let _guard = tracker.create_guard();
thread::sleep(Duration::from_millis(100));
}
thread::sleep(Duration::from_millis(50));
let result = tracker.calculate_overhead();
assert_eq!(result.active_subgraph_requests, 0);
assert!(
result.overhead >= Duration::from_millis(30)
&& result.overhead <= Duration::from_millis(200),
"overhead was {:?}",
result.overhead
);
}
#[test]
fn test_sequential_subgraph_requests() {
let tracker = RouterOverheadTracker::new();
{
let _guard = tracker.create_guard();
thread::sleep(Duration::from_millis(50));
}
thread::sleep(Duration::from_millis(20));
{
let _guard = tracker.create_guard();
thread::sleep(Duration::from_millis(50));
}
let result = tracker.calculate_overhead();
assert_eq!(result.active_subgraph_requests, 0);
assert!(
result.overhead >= Duration::from_millis(5)
&& result.overhead <= Duration::from_millis(100),
"overhead was {:?}",
result.overhead
);
}
#[test]
fn test_concurrent_subgraph_requests() {
let tracker = RouterOverheadTracker::new();
thread::sleep(Duration::from_millis(10));
let guard1 = tracker.create_guard();
thread::sleep(Duration::from_millis(50));
let guard2 = tracker.create_guard();
thread::sleep(Duration::from_millis(50));
drop(guard1); thread::sleep(Duration::from_millis(50));
drop(guard2);
let result = tracker.calculate_overhead();
assert_eq!(result.active_subgraph_requests, 0);
assert!(
result.overhead >= Duration::from_millis(3)
&& result.overhead <= Duration::from_millis(100),
"overhead was {:?}",
result.overhead
);
}
#[test]
fn test_no_subgraph_requests() {
let tracker = RouterOverheadTracker::new();
thread::sleep(Duration::from_millis(100));
let result = tracker.calculate_overhead();
assert_eq!(result.active_subgraph_requests, 0);
assert!(
result.overhead >= Duration::from_millis(80)
&& result.overhead <= Duration::from_millis(250),
"overhead was {:?}",
result.overhead
);
}
#[test]
fn test_guard_drop_on_panic() {
let tracker = Arc::new(RouterOverheadTracker::new());
let tracker_clone = tracker.clone();
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
let _guard = tracker_clone.create_guard();
thread::sleep(Duration::from_millis(50));
panic!("simulated error");
}));
assert!(result.is_err());
thread::sleep(Duration::from_millis(10));
let result = tracker.calculate_overhead();
assert_eq!(result.active_subgraph_requests, 0);
assert!(result.overhead >= Duration::ZERO);
}
#[test]
fn test_thread_safety() {
let tracker = Arc::new(RouterOverheadTracker::new());
let mut handles = vec![];
for _ in 0..10 {
let tracker_clone = tracker.clone();
let handle = thread::spawn(move || {
let _guard = tracker_clone.create_guard();
thread::sleep(Duration::from_millis(10));
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
let result = tracker.calculate_overhead();
assert_eq!(result.active_subgraph_requests, 0);
assert!(result.overhead >= Duration::ZERO);
}
#[test]
fn test_active_subgraph_requests_flag() {
let tracker = RouterOverheadTracker::new();
thread::sleep(Duration::from_millis(10));
let result = tracker.calculate_overhead();
assert_eq!(result.active_subgraph_requests, 0);
let _guard = tracker.create_guard();
thread::sleep(Duration::from_millis(10));
let result = tracker.calculate_overhead();
assert_eq!(
result.active_subgraph_requests, 1,
"Should have active subgraph requests"
);
drop(_guard);
thread::sleep(Duration::from_millis(10));
let result = tracker.calculate_overhead();
assert_eq!(
result.active_subgraph_requests, 0,
"Should not have active subgraph requests"
);
}
}