Skip to main content

synapse_pingora/payload/
entity_bandwidth.rs

1//! Per-entity (IP) bandwidth tracking.
2
3use serde::{Deserialize, Serialize};
4use std::collections::VecDeque;
5use std::time::Instant;
6
7/// A time bucket for bandwidth tracking.
8#[derive(Debug, Clone, Serialize, Deserialize)]
9pub struct BandwidthBucket {
10    /// Bucket timestamp (Unix ms)
11    pub timestamp_ms: i64,
12    /// Request bytes in this bucket
13    pub request_bytes: u64,
14    /// Response bytes in this bucket
15    pub response_bytes: u64,
16    /// Request count in this bucket
17    pub request_count: u64,
18}
19
20impl BandwidthBucket {
21    /// Create a new bucket with current timestamp.
22    pub fn new() -> Self {
23        Self {
24            timestamp_ms: chrono::Utc::now().timestamp_millis(),
25            request_bytes: 0,
26            response_bytes: 0,
27            request_count: 0,
28        }
29    }
30
31    /// Total bytes in this bucket.
32    pub fn total_bytes(&self) -> u64 {
33        self.request_bytes + self.response_bytes
34    }
35}
36
37impl Default for BandwidthBucket {
38    fn default() -> Self {
39        Self::new()
40    }
41}
42
43/// Bandwidth tracking for a single entity (IP).
44pub struct EntityBandwidth {
45    /// Entity identifier (usually IP address)
46    pub entity_id: String,
47    /// Total request bytes
48    pub total_request_bytes: u64,
49    /// Total response bytes
50    pub total_response_bytes: u64,
51    /// Total request count
52    pub total_request_count: u64,
53    /// Recent time buckets for spike detection
54    buckets: VecDeque<BandwidthBucket>,
55    /// Current active bucket
56    current_bucket: BandwidthBucket,
57    /// Bucket duration in ms
58    bucket_duration_ms: u64,
59    /// Maximum buckets to keep
60    max_buckets: usize,
61    /// Last bucket rotation time
62    last_rotation: Instant,
63    /// First seen timestamp
64    pub first_seen: Instant,
65    /// Last seen timestamp
66    pub last_seen: Instant,
67    /// Access counter for LRU
68    pub access_count: u64,
69}
70
71impl EntityBandwidth {
72    /// Create new bandwidth tracking for an entity.
73    pub fn new(entity_id: String, bucket_duration_ms: u64, max_buckets: usize) -> Self {
74        let now = Instant::now();
75        Self {
76            entity_id,
77            total_request_bytes: 0,
78            total_response_bytes: 0,
79            total_request_count: 0,
80            buckets: VecDeque::with_capacity(max_buckets),
81            current_bucket: BandwidthBucket::new(),
82            bucket_duration_ms,
83            max_buckets,
84            last_rotation: now,
85            first_seen: now,
86            last_seen: now,
87            access_count: 0,
88        }
89    }
90
91    /// Record a request/response pair.
92    pub fn record(&mut self, request_bytes: u64, response_bytes: u64) {
93        self.last_seen = Instant::now();
94        self.access_count += 1;
95
96        // Update totals
97        self.total_request_bytes += request_bytes;
98        self.total_response_bytes += response_bytes;
99        self.total_request_count += 1;
100
101        // Rotate bucket if needed
102        if self.last_rotation.elapsed().as_millis() >= self.bucket_duration_ms as u128 {
103            self.rotate_bucket();
104        }
105
106        // Record in current bucket
107        self.current_bucket.request_bytes += request_bytes;
108        self.current_bucket.response_bytes += response_bytes;
109        self.current_bucket.request_count += 1;
110    }
111
112    /// Rotate to a new bucket.
113    fn rotate_bucket(&mut self) {
114        let old_bucket = std::mem::take(&mut self.current_bucket);
115        self.buckets.push_back(old_bucket);
116        self.last_rotation = Instant::now();
117
118        // Evict old buckets
119        while self.buckets.len() > self.max_buckets {
120            self.buckets.pop_front();
121        }
122    }
123
124    /// Get average bytes per minute from recent buckets.
125    pub fn avg_bytes_per_minute(&self) -> u64 {
126        if self.buckets.is_empty() {
127            return self.current_bucket.total_bytes();
128        }
129
130        let total: u64 = self.buckets.iter().map(|b| b.total_bytes()).sum();
131        total / self.buckets.len() as u64
132    }
133
134    /// Get current bytes per minute (from current bucket).
135    pub fn current_bytes_per_minute(&self) -> u64 {
136        self.current_bucket.total_bytes()
137    }
138
139    /// Get total bytes.
140    pub fn total_bytes(&self) -> u64 {
141        self.total_request_bytes + self.total_response_bytes
142    }
143
144    /// Get recent buckets as snapshots.
145    pub fn recent_buckets(&self) -> Vec<BandwidthBucket> {
146        self.buckets.iter().cloned().collect()
147    }
148}
149
150/// Serializable snapshot for API responses.
151#[derive(Debug, Clone, Serialize, Deserialize)]
152pub struct EntityBandwidthSnapshot {
153    pub entity_id: String,
154    pub total_request_bytes: u64,
155    pub total_response_bytes: u64,
156    pub total_request_count: u64,
157    pub bytes_per_minute: u64,
158    pub first_seen_ms: i64,
159    pub last_seen_ms: i64,
160    pub recent_buckets: Vec<BandwidthBucket>,
161}
162
163impl From<&EntityBandwidth> for EntityBandwidthSnapshot {
164    fn from(entity: &EntityBandwidth) -> Self {
165        let now = chrono::Utc::now().timestamp_millis();
166        let first_elapsed = entity.first_seen.elapsed().as_millis() as i64;
167        let last_elapsed = entity.last_seen.elapsed().as_millis() as i64;
168
169        Self {
170            entity_id: entity.entity_id.clone(),
171            total_request_bytes: entity.total_request_bytes,
172            total_response_bytes: entity.total_response_bytes,
173            total_request_count: entity.total_request_count,
174            bytes_per_minute: entity.avg_bytes_per_minute(),
175            first_seen_ms: now - first_elapsed,
176            last_seen_ms: now - last_elapsed,
177            recent_buckets: entity.recent_buckets(),
178        }
179    }
180}