fraiseql_wire/stream/adaptive_chunking.rs
1//! Adaptive chunk sizing based on channel occupancy patterns
2//!
3//! This module implements self-tuning chunk sizes that automatically adjust batch sizes
4//! based on observed backpressure (channel occupancy).
5//!
6//! **Critical Semantics**:
7//! `chunk_size` controls **both**:
8//! 1. MPSC channel capacity (backpressure buffer)
9//! 2. Batch size for Postgres row parsing
10//!
11//! **Control Signal Interpretation**:
12//! - **High occupancy** (>80%): Producer waiting on channel capacity, consumer slow
13//! → **Reduce `chunk_size`**: smaller batches reduce pressure, lower latency per item
14//!
15//! - **Low occupancy** (<20%): Consumer faster than producer, frequent context switches
16//! → **Increase `chunk_size`**: larger batches amortize parsing cost, less frequent wakeups
17//!
18//! **Design Principles**:
19//! - Measurement-based adjustment (50-item window) for stability
20//! - Hysteresis band (20%-80%) prevents frequent oscillation
21//! - Minimum adjustment interval (1 second) prevents thrashing
22//! - Conservative bounds (16-1024) prevent pathological extremes
23//! - Clear window reset after adjustment (fresh observations)
24
25use std::collections::VecDeque;
26use std::time::{Duration, Instant};
27
28/// Single observation of channel occupancy
29#[derive(Copy, Clone, Debug)]
30struct Occupancy {
31 /// Percentage of channel capacity in use (0-100)
32 percentage: usize,
33}
34
35/// Tracks channel occupancy and automatically adjusts chunk size based on backpressure
36///
37/// # Examples
38///
39/// ```rust
40/// use fraiseql_wire::stream::AdaptiveChunking;
41/// let mut adaptive = AdaptiveChunking::new();
42/// let (buffered_items, channel_capacity) = (50usize, 256usize);
43///
44/// // Periodically observe channel occupancy
45/// for _chunk_sent in 0..100 {
46/// if let Some(new_size) = adaptive.observe(buffered_items, channel_capacity) {
47/// println!("Adjusted chunk size to {}", new_size);
48/// }
49/// }
50/// ```
51pub struct AdaptiveChunking {
52 /// Current chunk size (mutable, adjusted over time)
53 current_size: usize,
54
55 /// Absolute minimum chunk size (never decrease below this)
56 min_size: usize,
57
58 /// Absolute maximum chunk size (never increase beyond this)
59 max_size: usize,
60
61 /// Number of measurements to collect before making adjustment decision
62 adjustment_window: usize,
63
64 /// Rolling window of recent occupancy observations
65 measurements: VecDeque<Occupancy>,
66
67 /// Timestamp of last chunk size adjustment (for rate limiting)
68 last_adjustment_time: Option<Instant>,
69
70 /// Minimum time between adjustments (prevents thrashing/oscillation)
71 min_adjustment_interval: Duration,
72}
73
74impl AdaptiveChunking {
75 /// Create a new adaptive chunking controller with default bounds
76 ///
77 /// **Defaults**:
78 /// - Initial chunk size: 256 items
79 /// - Min size: 16 items
80 /// - Max size: 1024 items
81 /// - Adjustment window: 50 observations
82 /// - Min adjustment interval: 1 second
83 ///
84 /// # Examples
85 ///
86 /// ```rust
87 /// use fraiseql_wire::stream::AdaptiveChunking;
88 /// let adaptive = AdaptiveChunking::new();
89 /// assert_eq!(adaptive.current_size(), 256);
90 /// ```
91 pub fn new() -> Self {
92 Self {
93 current_size: 256,
94 min_size: 16,
95 max_size: 1024,
96 adjustment_window: 50,
97 measurements: VecDeque::with_capacity(50),
98 last_adjustment_time: None,
99 min_adjustment_interval: Duration::from_secs(1),
100 }
101 }
102
103 /// Record an occupancy observation and check if chunk size adjustment is warranted
104 ///
105 /// Call this method after each chunk is sent to the channel.
106 /// Returns `Some(new_size)` if an adjustment should be applied, `None` otherwise.
107 ///
108 /// # Arguments
109 ///
110 /// * `items_buffered` - Number of items currently in the channel
111 /// * `capacity` - Total capacity of the channel (usually equal to `chunk_size`)
112 ///
113 /// # Examples
114 ///
115 /// ```rust
116 /// use fraiseql_wire::stream::AdaptiveChunking;
117 /// let mut adaptive = AdaptiveChunking::new();
118 ///
119 /// // Simulate high occupancy (90%)
120 /// for _ in 0..50 {
121 /// adaptive.observe(230, 256); // ~90% occupancy
122 /// }
123 ///
124 /// // On the 51st observation, should trigger adjustment
125 /// if let Some(new_size) = adaptive.observe(230, 256) {
126 /// println!("Adjusted to {}", new_size); // Will be < 256
127 /// }
128 /// ```
129 pub fn observe(&mut self, items_buffered: usize, capacity: usize) -> Option<usize> {
130 // Calculate occupancy percentage (clamped at 100% if buffer exceeds capacity)
131 // Special case: if capacity is 0, treat occupancy as 0% (consumer draining instantly)
132 let pct = if capacity == 0 {
133 0
134 } else {
135 (items_buffered * 100)
136 .checked_div(capacity)
137 .unwrap_or(100)
138 .min(100)
139 };
140
141 // Record this observation
142 self.measurements.push_back(Occupancy { percentage: pct });
143
144 // Keep only the most recent measurements in the window
145 while self.measurements.len() > self.adjustment_window {
146 self.measurements.pop_front();
147 }
148
149 // Only consider adjustment if we have a FULL window of observations
150 // (i.e., exactly equal to the window size, not more)
151 // This ensures we only evaluate after collecting N measurements
152 if self.measurements.len() == self.adjustment_window && self.should_adjust() {
153 return self.calculate_adjustment();
154 }
155
156 None
157 }
158
159 /// Get the current chunk size
160 ///
161 /// # Examples
162 ///
163 /// ```rust
164 /// use fraiseql_wire::stream::AdaptiveChunking;
165 /// let adaptive = AdaptiveChunking::new();
166 /// assert_eq!(adaptive.current_size(), 256);
167 /// ```
168 pub const fn current_size(&self) -> usize {
169 self.current_size
170 }
171
172 /// Set custom min/max bounds for chunk size adjustments
173 ///
174 /// Allows overriding the default bounds (16-1024) with custom limits.
175 /// The current chunk size will be clamped to the new bounds.
176 ///
177 /// # Arguments
178 ///
179 /// * `min_size` - Minimum chunk size (must be > 0)
180 /// * `max_size` - Maximum chunk size (must be >= `min_size`)
181 ///
182 /// # Examples
183 ///
184 /// ```rust
185 /// use fraiseql_wire::stream::AdaptiveChunking;
186 /// let mut adaptive = AdaptiveChunking::new();
187 /// adaptive = adaptive.with_bounds(32, 512); // Custom range 32-512
188 /// assert!(adaptive.current_size() >= 32);
189 /// assert!(adaptive.current_size() <= 512);
190 /// ```
191 pub fn with_bounds(mut self, min_size: usize, max_size: usize) -> Self {
192 // Basic validation
193 if min_size == 0 || max_size < min_size {
194 tracing::warn!(
195 "invalid chunk bounds: min={}, max={}, keeping defaults",
196 min_size,
197 max_size
198 );
199 return self;
200 }
201
202 self.min_size = min_size;
203 self.max_size = max_size;
204
205 // Clamp current size to new bounds
206 if self.current_size < min_size {
207 self.current_size = min_size;
208 } else if self.current_size > max_size {
209 self.current_size = max_size;
210 }
211
212 tracing::debug!(
213 "adaptive chunking bounds set: min={}, max={}, current={}",
214 self.min_size,
215 self.max_size,
216 self.current_size
217 );
218
219 self
220 }
221
222 /// Calculate average occupancy percentage over the measurement window
223 fn average_occupancy(&self) -> usize {
224 if self.measurements.is_empty() {
225 return 0;
226 }
227
228 let sum: usize = self.measurements.iter().map(|m| m.percentage).sum();
229 sum / self.measurements.len()
230 }
231
232 /// Check if adjustment conditions are met
233 ///
234 /// Adjustment is only considered if:
235 /// 1. At least 1 second has elapsed since the last adjustment
236 /// 2. Average occupancy is outside the hysteresis band (< 20% or > 80%)
237 fn should_adjust(&self) -> bool {
238 // Rate limit: don't adjust too frequently
239 if let Some(last_adj) = self.last_adjustment_time {
240 if last_adj.elapsed() < self.min_adjustment_interval {
241 return false;
242 }
243 }
244
245 // Hysteresis: only adjust if we're clearly outside the comfort zone
246 let avg = self.average_occupancy();
247 !(20..=80).contains(&avg)
248 }
249
250 /// Calculate the new chunk size based on average occupancy
251 ///
252 /// **Logic**:
253 /// - If avg > 80%: **DECREASE** by factor of 1.5 (high occupancy = producer backed up)
254 /// - If avg < 20%: **INCREASE** by factor of 1.5 (low occupancy = consumer fast)
255 /// - Clamps to [`min_size`, `max_size`]
256 /// - Clears measurements after adjustment
257 ///
258 /// Returns `Some(new_size)` if size actually changed, `None` if no change needed.
259 fn calculate_adjustment(&mut self) -> Option<usize> {
260 let avg = self.average_occupancy();
261 let old_size = self.current_size;
262
263 let new_size = if avg > 80 {
264 // High occupancy: producer is waiting on channel, consumer is slow
265 // → DECREASE chunk_size to reduce backpressure and latency
266 ((self.current_size as f64 / 1.5).floor() as usize).max(self.min_size)
267 } else if avg < 20 {
268 // Low occupancy: consumer is draining fast, producer could batch more
269 // → INCREASE chunk_size to amortize parsing cost and reduce context switches
270 ((self.current_size as f64 * 1.5).ceil() as usize).min(self.max_size)
271 } else {
272 old_size
273 };
274
275 // Only return if there was an actual change
276 if new_size != old_size {
277 self.current_size = new_size;
278 self.last_adjustment_time = Some(Instant::now());
279 self.measurements.clear(); // Reset window for fresh observations
280 Some(new_size)
281 } else {
282 None
283 }
284 }
285}
286
287impl Default for AdaptiveChunking {
288 fn default() -> Self {
289 Self::new()
290 }
291}
292
293#[cfg(test)]
294mod tests {
295 #![allow(clippy::unwrap_used)] // Reason: test code, panics are acceptable
296 use super::*;
297
298 #[test]
299 fn test_new_defaults() {
300 let adaptive = AdaptiveChunking::new();
301 assert_eq!(adaptive.current_size(), 256);
302 assert_eq!(adaptive.min_size, 16);
303 assert_eq!(adaptive.max_size, 1024);
304 assert_eq!(adaptive.adjustment_window, 50);
305 assert!(adaptive.last_adjustment_time.is_none());
306 assert!(adaptive.measurements.is_empty());
307 }
308
309 #[test]
310 fn test_no_adjustment_in_hysteresis_band() {
311 let mut adaptive = AdaptiveChunking::new();
312
313 // Simulate 50% occupancy (inside 20-80% hysteresis band)
314 // 50% of 256 = 128 items
315 for _ in 0..50 {
316 assert_eq!(adaptive.observe(128, 256), None);
317 }
318
319 // Should not adjust - still at 256
320 assert_eq!(adaptive.current_size(), 256);
321 }
322
323 #[test]
324 fn test_decrease_on_high_occupancy() {
325 let mut adaptive = AdaptiveChunking::new();
326 let original_size = 256;
327
328 // Simulate 90% occupancy (producer backed up, consumer slow)
329 // 90% of 256 = 230.4 ≈ 230 items
330 for _ in 0..49 {
331 assert_eq!(adaptive.observe(230, 256), None);
332 }
333
334 // On 50th observation, should trigger adjustment
335 let result = adaptive.observe(230, 256);
336 assert!(result.is_some());
337
338 let new_size = result.unwrap();
339 assert!(
340 new_size < original_size,
341 "Should decrease on high occupancy"
342 );
343 assert!(new_size >= 16, "Should respect min bound");
344 }
345
346 #[test]
347 fn test_increase_on_low_occupancy() {
348 let mut adaptive = AdaptiveChunking::new();
349 let original_size = 256;
350
351 // Simulate 10% occupancy (consumer fast, producer lagging)
352 // 10% of 256 = 25.6 ≈ 26 items
353 for _ in 0..49 {
354 assert_eq!(adaptive.observe(26, 256), None);
355 }
356
357 // On 50th observation, should trigger adjustment
358 let result = adaptive.observe(26, 256);
359 assert!(result.is_some());
360
361 let new_size = result.unwrap();
362 assert!(new_size > original_size, "Should increase on low occupancy");
363 assert!(new_size <= 1024, "Should respect max bound");
364 }
365
366 #[test]
367 fn test_respects_min_bound() {
368 let mut adaptive = AdaptiveChunking::new();
369
370 // Simulate very high occupancy repeatedly
371 for iteration in 0..20 {
372 // Reset measurements every iteration to allow adjustments
373 for _ in 0..50 {
374 adaptive.observe(250, 256);
375 }
376 adaptive.observe(250, 256);
377
378 // Verify we never go below minimum
379 assert!(
380 adaptive.current_size() >= 16,
381 "Iteration {}: size {} < min",
382 iteration,
383 adaptive.current_size()
384 );
385 }
386 }
387
388 #[test]
389 fn test_respects_max_bound() {
390 let mut adaptive = AdaptiveChunking::new();
391
392 // Simulate very low occupancy repeatedly
393 for iteration in 0..20 {
394 // Reset measurements every iteration to allow adjustments
395 for _ in 0..50 {
396 adaptive.observe(10, 256);
397 }
398 adaptive.observe(10, 256);
399
400 // Verify we never go above maximum
401 assert!(
402 adaptive.current_size() <= 1024,
403 "Iteration {}: size {} > max",
404 iteration,
405 adaptive.current_size()
406 );
407 }
408 }
409
410 #[test]
411 fn test_respects_min_adjustment_interval() {
412 let mut adaptive = AdaptiveChunking::new();
413
414 // Fill window with high occupancy (>80%) and trigger first adjustment
415 // 230/256 ≈ 89.8%
416 // Make 49 calls so window is not yet full
417 for _ in 0..49 {
418 let result = adaptive.observe(230, 256);
419 assert_eq!(result, None, "Should not adjust yet, window not full");
420 }
421
422 // 50th call: window becomes full, should trigger adjustment
423 let first_adjustment = adaptive.observe(230, 256);
424 assert!(
425 first_adjustment.is_some(),
426 "Should adjust on 50th observation when window is full"
427 );
428
429 let first_size = adaptive.current_size();
430 assert!(
431 first_size < 256,
432 "High occupancy should decrease chunk size"
433 );
434
435 // Immediately try to trigger another adjustment within 1 second
436 // This should NOT happen because of the 1-second minimum interval
437 // Build up a new window with different occupancy, still shouldn't trigger
438 for _ in 0..50 {
439 let result = adaptive.observe(230, 256);
440 assert_eq!(
441 result, None,
442 "Should not adjust again so soon (within min interval)"
443 );
444 }
445
446 // Should not adjust again immediately, even though window is full again
447 assert_eq!(
448 adaptive.current_size(),
449 first_size,
450 "Size should remain unchanged due to rate limiting"
451 );
452 }
453
454 #[test]
455 fn test_window_resets_after_adjustment() {
456 let mut adaptive = AdaptiveChunking::new();
457
458 // First window: high occupancy triggers decrease
459 // 230/256 ≈ 89.8%
460 // Make 49 calls to fill window to size 49
461 for _ in 0..49 {
462 let result = adaptive.observe(230, 256);
463 assert_eq!(result, None, "Should not adjust yet, window not full");
464 }
465
466 // 50th call: window becomes full, triggers adjustment
467 let first = adaptive.observe(230, 256);
468 assert!(
469 first.is_some(),
470 "Should adjust when window reaches 50 observations"
471 );
472
473 // Measurements should be cleared after adjustment
474 assert!(
475 adaptive.measurements.is_empty(),
476 "Measurements should be cleared after adjustment"
477 );
478 }
479
480 #[test]
481 fn test_zero_capacity_handling() {
482 let mut adaptive = AdaptiveChunking::new();
483
484 // Zero capacity edge case: percentage = 0
485 // 0% occupancy is OUTSIDE hysteresis band (< 20%), so it WILL increase chunk size
486 // This makes sense: consumer is draining instantly, we can send bigger batches
487 // Make 49 calls so window is not yet full (size 49 < 50)
488 for _ in 0..49 {
489 let result = adaptive.observe(0, 0);
490 // Should not adjust until window is full (50 observations)
491 assert_eq!(result, None, "Should not adjust until window is full");
492 }
493
494 // On the 50th observation, window becomes full
495 // We should trigger an increase because occupancy < 20%
496 let result = adaptive.observe(0, 0);
497 assert!(
498 result.is_some(),
499 "Should increase chunk size when occupancy < 20% and window is full"
500 );
501 assert!(
502 adaptive.current_size() > 256,
503 "Should increase from 256 due to low occupancy"
504 );
505 }
506
507 #[test]
508 fn test_average_occupancy_calculation() {
509 let mut adaptive = AdaptiveChunking::new();
510
511 // Add measurements: 10%, 20%, 30%, 40%, 50%
512 // Calculate actual item counts: 25.6, 51.2, 76.8, 102.4, 128
513 // Which truncate to: 25, 51, 76, 102, 128
514 // And percentages: (25*100)/256=9, (51*100)/256=19, (76*100)/256=29, (102*100)/256=39, (128*100)/256=50
515 for pct in [10, 20, 30, 40, 50].iter() {
516 let items = (pct * 256) / 100;
517 adaptive.observe(items, 256);
518 }
519
520 let avg = adaptive.average_occupancy();
521 // Average of [9, 19, 29, 39, 50] = 146 / 5 = 29 (integer division)
522 assert_eq!(
523 avg, 29,
524 "Average should account for integer division in percentages"
525 );
526 }
527}