synapse_pingora/payload/
entity_bandwidth.rs1use serde::{Deserialize, Serialize};
4use std::collections::VecDeque;
5use std::time::Instant;
6
7#[derive(Debug, Clone, Serialize, Deserialize)]
9pub struct BandwidthBucket {
10 pub timestamp_ms: i64,
12 pub request_bytes: u64,
14 pub response_bytes: u64,
16 pub request_count: u64,
18}
19
20impl BandwidthBucket {
21 pub fn new() -> Self {
23 Self {
24 timestamp_ms: chrono::Utc::now().timestamp_millis(),
25 request_bytes: 0,
26 response_bytes: 0,
27 request_count: 0,
28 }
29 }
30
31 pub fn total_bytes(&self) -> u64 {
33 self.request_bytes + self.response_bytes
34 }
35}
36
37impl Default for BandwidthBucket {
38 fn default() -> Self {
39 Self::new()
40 }
41}
42
43pub struct EntityBandwidth {
45 pub entity_id: String,
47 pub total_request_bytes: u64,
49 pub total_response_bytes: u64,
51 pub total_request_count: u64,
53 buckets: VecDeque<BandwidthBucket>,
55 current_bucket: BandwidthBucket,
57 bucket_duration_ms: u64,
59 max_buckets: usize,
61 last_rotation: Instant,
63 pub first_seen: Instant,
65 pub last_seen: Instant,
67 pub access_count: u64,
69}
70
71impl EntityBandwidth {
72 pub fn new(entity_id: String, bucket_duration_ms: u64, max_buckets: usize) -> Self {
74 let now = Instant::now();
75 Self {
76 entity_id,
77 total_request_bytes: 0,
78 total_response_bytes: 0,
79 total_request_count: 0,
80 buckets: VecDeque::with_capacity(max_buckets),
81 current_bucket: BandwidthBucket::new(),
82 bucket_duration_ms,
83 max_buckets,
84 last_rotation: now,
85 first_seen: now,
86 last_seen: now,
87 access_count: 0,
88 }
89 }
90
91 pub fn record(&mut self, request_bytes: u64, response_bytes: u64) {
93 self.last_seen = Instant::now();
94 self.access_count += 1;
95
96 self.total_request_bytes += request_bytes;
98 self.total_response_bytes += response_bytes;
99 self.total_request_count += 1;
100
101 if self.last_rotation.elapsed().as_millis() >= self.bucket_duration_ms as u128 {
103 self.rotate_bucket();
104 }
105
106 self.current_bucket.request_bytes += request_bytes;
108 self.current_bucket.response_bytes += response_bytes;
109 self.current_bucket.request_count += 1;
110 }
111
112 fn rotate_bucket(&mut self) {
114 let old_bucket = std::mem::take(&mut self.current_bucket);
115 self.buckets.push_back(old_bucket);
116 self.last_rotation = Instant::now();
117
118 while self.buckets.len() > self.max_buckets {
120 self.buckets.pop_front();
121 }
122 }
123
124 pub fn avg_bytes_per_minute(&self) -> u64 {
126 if self.buckets.is_empty() {
127 return self.current_bucket.total_bytes();
128 }
129
130 let total: u64 = self.buckets.iter().map(|b| b.total_bytes()).sum();
131 total / self.buckets.len() as u64
132 }
133
134 pub fn current_bytes_per_minute(&self) -> u64 {
136 self.current_bucket.total_bytes()
137 }
138
139 pub fn total_bytes(&self) -> u64 {
141 self.total_request_bytes + self.total_response_bytes
142 }
143
144 pub fn recent_buckets(&self) -> Vec<BandwidthBucket> {
146 self.buckets.iter().cloned().collect()
147 }
148}
149
150#[derive(Debug, Clone, Serialize, Deserialize)]
152pub struct EntityBandwidthSnapshot {
153 pub entity_id: String,
154 pub total_request_bytes: u64,
155 pub total_response_bytes: u64,
156 pub total_request_count: u64,
157 pub bytes_per_minute: u64,
158 pub first_seen_ms: i64,
159 pub last_seen_ms: i64,
160 pub recent_buckets: Vec<BandwidthBucket>,
161}
162
163impl From<&EntityBandwidth> for EntityBandwidthSnapshot {
164 fn from(entity: &EntityBandwidth) -> Self {
165 let now = chrono::Utc::now().timestamp_millis();
166 let first_elapsed = entity.first_seen.elapsed().as_millis() as i64;
167 let last_elapsed = entity.last_seen.elapsed().as_millis() as i64;
168
169 Self {
170 entity_id: entity.entity_id.clone(),
171 total_request_bytes: entity.total_request_bytes,
172 total_response_bytes: entity.total_response_bytes,
173 total_request_count: entity.total_request_count,
174 bytes_per_minute: entity.avg_bytes_per_minute(),
175 first_seen_ms: now - first_elapsed,
176 last_seen_ms: now - last_elapsed,
177 recent_buckets: entity.recent_buckets(),
178 }
179 }
180}