Skip to main content

synapse_pingora/payload/
endpoint_stats.rs

1//! Per-endpoint payload statistics tracking.
2
3use serde::{Deserialize, Serialize};
4use std::collections::VecDeque;
5use std::time::Instant;
6
7use crate::profiler::Distribution;
8
9/// Statistics for payload sizes.
10#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct SizeStats {
12    /// Number of samples
13    pub count: u64,
14    /// Total bytes
15    pub total_bytes: u64,
16    /// Minimum size seen
17    pub min_bytes: u64,
18    /// Maximum size seen
19    pub max_bytes: u64,
20    /// P50 (median) size
21    pub p50_bytes: f64,
22    /// P95 size
23    pub p95_bytes: f64,
24    /// P99 size
25    pub p99_bytes: f64,
26}
27
28impl Default for SizeStats {
29    fn default() -> Self {
30        Self {
31            count: 0,
32            total_bytes: 0,
33            min_bytes: u64::MAX,
34            max_bytes: 0,
35            p50_bytes: 0.0,
36            p95_bytes: 0.0,
37            p99_bytes: 0.0,
38        }
39    }
40}
41
42impl SizeStats {
43    /// Calculate average bytes.
44    pub fn avg_bytes(&self) -> f64 {
45        if self.count == 0 {
46            0.0
47        } else {
48            self.total_bytes as f64 / self.count as f64
49        }
50    }
51
52    /// Create stats from a Distribution.
53    pub fn from_distribution(dist: &Distribution, total_bytes: u64) -> Self {
54        let (p50, p95, p99) = dist.percentiles();
55        Self {
56            count: dist.count() as u64,
57            total_bytes,
58            min_bytes: dist.min() as u64,
59            max_bytes: dist.max() as u64,
60            p50_bytes: p50,
61            p95_bytes: p95,
62            p99_bytes: p99,
63        }
64    }
65}
66
67/// A time window for sliding aggregation.
68#[derive(Debug, Clone)]
69pub struct PayloadWindow {
70    /// Window start time
71    pub start: Instant,
72    /// Window end time
73    pub end: Instant,
74    /// Total request bytes in window
75    pub request_bytes: u64,
76    /// Total response bytes in window
77    pub response_bytes: u64,
78    /// Number of requests in window
79    pub request_count: u64,
80}
81
82impl PayloadWindow {
83    /// Create a new window starting now.
84    pub fn new(duration_ms: u64) -> Self {
85        let now = Instant::now();
86        Self {
87            start: now,
88            end: now + std::time::Duration::from_millis(duration_ms),
89            request_bytes: 0,
90            response_bytes: 0,
91            request_count: 0,
92        }
93    }
94
95    /// Check if the window has expired.
96    pub fn is_expired(&self) -> bool {
97        Instant::now() >= self.end
98    }
99
100    /// Record a request in this window.
101    pub fn record(&mut self, request_bytes: u64, response_bytes: u64) {
102        self.request_bytes += request_bytes;
103        self.response_bytes += response_bytes;
104        self.request_count += 1;
105    }
106}
107
108/// Serializable version of PayloadWindow for API responses.
109#[derive(Debug, Clone, Serialize, Deserialize)]
110pub struct PayloadWindowSnapshot {
111    pub start_ms: i64,
112    pub end_ms: i64,
113    pub request_bytes: u64,
114    pub response_bytes: u64,
115    pub request_count: u64,
116}
117
118/// Statistics for a single endpoint template.
119pub struct EndpointPayloadStats {
120    /// Endpoint template (e.g., "/api/users/{id}")
121    pub template: String,
122    /// Request size distribution
123    pub request_dist: Distribution,
124    /// Response size distribution
125    pub response_dist: Distribution,
126    /// Total request bytes
127    pub total_request_bytes: u64,
128    /// Total response bytes
129    pub total_response_bytes: u64,
130    /// Sliding windows for recent data
131    pub windows: VecDeque<PayloadWindow>,
132    /// Current (active) window
133    pub current_window: PayloadWindow,
134    /// Window duration in ms
135    window_duration_ms: u64,
136    /// Maximum windows to keep
137    max_windows: usize,
138    /// First seen timestamp
139    pub first_seen: Instant,
140    /// Last seen timestamp
141    pub last_seen: Instant,
142    /// Access counter for LRU
143    pub access_count: u64,
144}
145
146impl EndpointPayloadStats {
147    /// Create new stats for an endpoint.
148    pub fn new(template: String, window_duration_ms: u64, max_windows: usize) -> Self {
149        let now = Instant::now();
150        Self {
151            template,
152            request_dist: Distribution::new(),
153            response_dist: Distribution::new(),
154            total_request_bytes: 0,
155            total_response_bytes: 0,
156            windows: VecDeque::with_capacity(max_windows),
157            current_window: PayloadWindow::new(window_duration_ms),
158            window_duration_ms,
159            max_windows,
160            first_seen: now,
161            last_seen: now,
162            access_count: 0,
163        }
164    }
165
166    /// Record a request/response pair.
167    pub fn record(&mut self, request_bytes: u64, response_bytes: u64) {
168        self.last_seen = Instant::now();
169        self.access_count += 1;
170
171        // Update distributions
172        self.request_dist.update(request_bytes as f64);
173        self.response_dist.update(response_bytes as f64);
174
175        // Update totals
176        self.total_request_bytes += request_bytes;
177        self.total_response_bytes += response_bytes;
178
179        // Rotate window if needed
180        if self.current_window.is_expired() {
181            self.rotate_window();
182        }
183
184        // Record in current window
185        self.current_window.record(request_bytes, response_bytes);
186    }
187
188    /// Rotate to a new window.
189    fn rotate_window(&mut self) {
190        let old_window = std::mem::replace(
191            &mut self.current_window,
192            PayloadWindow::new(self.window_duration_ms),
193        );
194        self.windows.push_back(old_window);
195
196        // Evict old windows
197        while self.windows.len() > self.max_windows {
198            self.windows.pop_front();
199        }
200    }
201
202    /// Get request size stats.
203    pub fn request_stats(&self) -> SizeStats {
204        SizeStats::from_distribution(&self.request_dist, self.total_request_bytes)
205    }
206
207    /// Get response size stats.
208    pub fn response_stats(&self) -> SizeStats {
209        SizeStats::from_distribution(&self.response_dist, self.total_response_bytes)
210    }
211
212    /// Get total request count.
213    pub fn request_count(&self) -> u64 {
214        self.request_dist.count() as u64
215    }
216
217    /// Get bytes per minute (from recent windows).
218    pub fn bytes_per_minute(&self) -> (u64, u64) {
219        if self.windows.is_empty() {
220            return (0, 0);
221        }
222
223        let mut total_request = 0u64;
224        let mut total_response = 0u64;
225
226        for window in &self.windows {
227            total_request += window.request_bytes;
228            total_response += window.response_bytes;
229        }
230
231        // Average across windows
232        let count = self.windows.len() as u64;
233        (total_request / count, total_response / count)
234    }
235}
236
237/// Serializable snapshot for API responses.
238#[derive(Debug, Clone, Serialize, Deserialize)]
239pub struct EndpointPayloadStatsSnapshot {
240    pub template: String,
241    pub request: SizeStats,
242    pub response: SizeStats,
243    pub request_count: u64,
244    pub first_seen_ms: i64,
245    pub last_seen_ms: i64,
246}
247
248impl From<&EndpointPayloadStats> for EndpointPayloadStatsSnapshot {
249    fn from(stats: &EndpointPayloadStats) -> Self {
250        let now = chrono::Utc::now().timestamp_millis();
251        let first_elapsed = stats.first_seen.elapsed().as_millis() as i64;
252        let last_elapsed = stats.last_seen.elapsed().as_millis() as i64;
253
254        Self {
255            template: stats.template.clone(),
256            request: stats.request_stats(),
257            response: stats.response_stats(),
258            request_count: stats.request_count(),
259            first_seen_ms: now - first_elapsed,
260            last_seen_ms: now - last_elapsed,
261        }
262    }
263}