fraiseql_wire/stream/adaptive_chunking.rs
1//! Adaptive chunk sizing based on channel occupancy patterns
2//!
3//! This module implements self-tuning chunk sizes that automatically adjust batch sizes
4//! based on observed backpressure (channel occupancy).
5//!
6//! **Critical Semantics**:
7//! `chunk_size` controls **both**:
8//! 1. MPSC channel capacity (backpressure buffer)
9//! 2. Batch size for Postgres row parsing
10//!
11//! **Control Signal Interpretation**:
12//! - **High occupancy** (>80%): Producer waiting on channel capacity, consumer slow
13//! → **Reduce chunk_size**: smaller batches reduce pressure, lower latency per item
14//!
15//! - **Low occupancy** (<20%): Consumer faster than producer, frequent context switches
16//! → **Increase chunk_size**: larger batches amortize parsing cost, less frequent wakeups
17//!
18//! **Design Principles**:
19//! - Measurement-based adjustment (50-item window) for stability
20//! - Hysteresis band (20%-80%) prevents frequent oscillation
21//! - Minimum adjustment interval (1 second) prevents thrashing
22//! - Conservative bounds (16-1024) prevent pathological extremes
23//! - Clear window reset after adjustment (fresh observations)
24
25use std::collections::VecDeque;
26use std::time::{Duration, Instant};
27
28/// Single observation of channel occupancy
29#[derive(Copy, Clone, Debug)]
30struct Occupancy {
31 /// Percentage of channel capacity in use (0-100)
32 percentage: usize,
33}
34
35/// Tracks channel occupancy and automatically adjusts chunk size based on backpressure
36///
37/// # Examples
38///
39/// ```ignore
40/// let mut adaptive = AdaptiveChunking::new();
41///
42/// // Periodically observe channel occupancy
43/// for chunk_sent in 0..100 {
44/// let occupancy_pct = (buffered_items * 100) / channel_capacity;
45/// if let Some(new_size) = adaptive.observe(buffered_items, channel_capacity) {
46/// println!("Adjusted chunk size: {} -> {}", adaptive.current_size() - new_size, new_size);
47/// }
48/// }
49/// ```
50pub struct AdaptiveChunking {
51 /// Current chunk size (mutable, adjusted over time)
52 current_size: usize,
53
54 /// Absolute minimum chunk size (never decrease below this)
55 min_size: usize,
56
57 /// Absolute maximum chunk size (never increase beyond this)
58 max_size: usize,
59
60 /// Number of measurements to collect before making adjustment decision
61 adjustment_window: usize,
62
63 /// Rolling window of recent occupancy observations
64 measurements: VecDeque<Occupancy>,
65
66 /// Timestamp of last chunk size adjustment (for rate limiting)
67 last_adjustment_time: Option<Instant>,
68
69 /// Minimum time between adjustments (prevents thrashing/oscillation)
70 min_adjustment_interval: Duration,
71}
72
73impl AdaptiveChunking {
74 /// Create a new adaptive chunking controller with default bounds
75 ///
76 /// **Defaults**:
77 /// - Initial chunk size: 256 items
78 /// - Min size: 16 items
79 /// - Max size: 1024 items
80 /// - Adjustment window: 50 observations
81 /// - Min adjustment interval: 1 second
82 ///
83 /// # Examples
84 ///
85 /// ```ignore
86 /// let adaptive = AdaptiveChunking::new();
87 /// assert_eq!(adaptive.current_size(), 256);
88 /// ```
89 pub fn new() -> Self {
90 Self {
91 current_size: 256,
92 min_size: 16,
93 max_size: 1024,
94 adjustment_window: 50,
95 measurements: VecDeque::with_capacity(50),
96 last_adjustment_time: None,
97 min_adjustment_interval: Duration::from_secs(1),
98 }
99 }
100
101 /// Record an occupancy observation and check if chunk size adjustment is warranted
102 ///
103 /// Call this method after each chunk is sent to the channel.
104 /// Returns `Some(new_size)` if an adjustment should be applied, `None` otherwise.
105 ///
106 /// # Arguments
107 ///
108 /// * `items_buffered` - Number of items currently in the channel
109 /// * `capacity` - Total capacity of the channel (usually equal to chunk_size)
110 ///
111 /// # Examples
112 ///
113 /// ```ignore
114 /// let mut adaptive = AdaptiveChunking::new();
115 ///
116 /// // Simulate high occupancy (90%)
117 /// for _ in 0..50 {
118 /// adaptive.observe(230, 256); // ~90% occupancy
119 /// }
120 ///
121 /// // On the 51st observation, should trigger adjustment
122 /// if let Some(new_size) = adaptive.observe(230, 256) {
123 /// println!("Adjusted to {}", new_size); // Will be < 256
124 /// }
125 /// ```
126 pub fn observe(&mut self, items_buffered: usize, capacity: usize) -> Option<usize> {
127 // Calculate occupancy percentage (clamped at 100% if buffer exceeds capacity)
128 let pct = if capacity == 0 {
129 0
130 } else {
131 ((items_buffered * 100) / capacity).min(100)
132 };
133
134 // Record this observation
135 self.measurements.push_back(Occupancy { percentage: pct });
136
137 // Keep only the most recent measurements in the window
138 while self.measurements.len() > self.adjustment_window {
139 self.measurements.pop_front();
140 }
141
142 // Only consider adjustment if we have a FULL window of observations
143 // (i.e., exactly equal to the window size, not more)
144 // This ensures we only evaluate after collecting N measurements
145 if self.measurements.len() == self.adjustment_window && self.should_adjust() {
146 return self.calculate_adjustment();
147 }
148
149 None
150 }
151
152 /// Get the current chunk size
153 ///
154 /// # Examples
155 ///
156 /// ```ignore
157 /// let adaptive = AdaptiveChunking::new();
158 /// assert_eq!(adaptive.current_size(), 256);
159 /// ```
160 pub fn current_size(&self) -> usize {
161 self.current_size
162 }
163
164 /// Set custom min/max bounds for chunk size adjustments
165 ///
166 /// Allows overriding the default bounds (16-1024) with custom limits.
167 /// The current chunk size will be clamped to the new bounds.
168 ///
169 /// # Arguments
170 ///
171 /// * `min_size` - Minimum chunk size (must be > 0)
172 /// * `max_size` - Maximum chunk size (must be >= min_size)
173 ///
174 /// # Examples
175 ///
176 /// ```ignore
177 /// let mut adaptive = AdaptiveChunking::new();
178 /// adaptive = adaptive.with_bounds(32, 512); // Custom range 32-512
179 /// assert!(adaptive.current_size() >= 32);
180 /// assert!(adaptive.current_size() <= 512);
181 /// ```
182 pub fn with_bounds(mut self, min_size: usize, max_size: usize) -> Self {
183 // Basic validation
184 if min_size == 0 || max_size < min_size {
185 tracing::warn!(
186 "invalid chunk bounds: min={}, max={}, keeping defaults",
187 min_size,
188 max_size
189 );
190 return self;
191 }
192
193 self.min_size = min_size;
194 self.max_size = max_size;
195
196 // Clamp current size to new bounds
197 if self.current_size < min_size {
198 self.current_size = min_size;
199 } else if self.current_size > max_size {
200 self.current_size = max_size;
201 }
202
203 tracing::debug!(
204 "adaptive chunking bounds set: min={}, max={}, current={}",
205 self.min_size,
206 self.max_size,
207 self.current_size
208 );
209
210 self
211 }
212
213 /// Calculate average occupancy percentage over the measurement window
214 fn average_occupancy(&self) -> usize {
215 if self.measurements.is_empty() {
216 return 0;
217 }
218
219 let sum: usize = self.measurements.iter().map(|m| m.percentage).sum();
220 sum / self.measurements.len()
221 }
222
223 /// Check if adjustment conditions are met
224 ///
225 /// Adjustment is only considered if:
226 /// 1. At least 1 second has elapsed since the last adjustment
227 /// 2. Average occupancy is outside the hysteresis band (< 20% or > 80%)
228 fn should_adjust(&self) -> bool {
229 // Rate limit: don't adjust too frequently
230 if let Some(last_adj) = self.last_adjustment_time {
231 if last_adj.elapsed() < self.min_adjustment_interval {
232 return false;
233 }
234 }
235
236 // Hysteresis: only adjust if we're clearly outside the comfort zone
237 let avg = self.average_occupancy();
238 !(20..=80).contains(&avg)
239 }
240
241 /// Calculate the new chunk size based on average occupancy
242 ///
243 /// **Logic**:
244 /// - If avg > 80%: **DECREASE** by factor of 1.5 (high occupancy = producer backed up)
245 /// - If avg < 20%: **INCREASE** by factor of 1.5 (low occupancy = consumer fast)
246 /// - Clamps to [min_size, max_size]
247 /// - Clears measurements after adjustment
248 ///
249 /// Returns `Some(new_size)` if size actually changed, `None` if no change needed.
250 fn calculate_adjustment(&mut self) -> Option<usize> {
251 let avg = self.average_occupancy();
252 let old_size = self.current_size;
253
254 let new_size = if avg > 80 {
255 // High occupancy: producer is waiting on channel, consumer is slow
256 // → DECREASE chunk_size to reduce backpressure and latency
257 ((self.current_size as f64 / 1.5).floor() as usize).max(self.min_size)
258 } else if avg < 20 {
259 // Low occupancy: consumer is draining fast, producer could batch more
260 // → INCREASE chunk_size to amortize parsing cost and reduce context switches
261 ((self.current_size as f64 * 1.5).ceil() as usize).min(self.max_size)
262 } else {
263 old_size
264 };
265
266 // Only return if there was an actual change
267 if new_size != old_size {
268 self.current_size = new_size;
269 self.last_adjustment_time = Some(Instant::now());
270 self.measurements.clear(); // Reset window for fresh observations
271 Some(new_size)
272 } else {
273 None
274 }
275 }
276}
277
278impl Default for AdaptiveChunking {
279 fn default() -> Self {
280 Self::new()
281 }
282}
283
284#[cfg(test)]
285mod tests {
286 use super::*;
287
288 #[test]
289 fn test_new_defaults() {
290 let adaptive = AdaptiveChunking::new();
291 assert_eq!(adaptive.current_size(), 256);
292 assert_eq!(adaptive.min_size, 16);
293 assert_eq!(adaptive.max_size, 1024);
294 assert_eq!(adaptive.adjustment_window, 50);
295 assert!(adaptive.last_adjustment_time.is_none());
296 assert!(adaptive.measurements.is_empty());
297 }
298
299 #[test]
300 fn test_no_adjustment_in_hysteresis_band() {
301 let mut adaptive = AdaptiveChunking::new();
302
303 // Simulate 50% occupancy (inside 20-80% hysteresis band)
304 // 50% of 256 = 128 items
305 for _ in 0..50 {
306 assert_eq!(adaptive.observe(128, 256), None);
307 }
308
309 // Should not adjust - still at 256
310 assert_eq!(adaptive.current_size(), 256);
311 }
312
313 #[test]
314 fn test_decrease_on_high_occupancy() {
315 let mut adaptive = AdaptiveChunking::new();
316 let original_size = 256;
317
318 // Simulate 90% occupancy (producer backed up, consumer slow)
319 // 90% of 256 = 230.4 ≈ 230 items
320 for _ in 0..49 {
321 assert_eq!(adaptive.observe(230, 256), None);
322 }
323
324 // On 50th observation, should trigger adjustment
325 let result = adaptive.observe(230, 256);
326 assert!(result.is_some());
327
328 let new_size = result.unwrap();
329 assert!(
330 new_size < original_size,
331 "Should decrease on high occupancy"
332 );
333 assert!(new_size >= 16, "Should respect min bound");
334 }
335
336 #[test]
337 fn test_increase_on_low_occupancy() {
338 let mut adaptive = AdaptiveChunking::new();
339 let original_size = 256;
340
341 // Simulate 10% occupancy (consumer fast, producer lagging)
342 // 10% of 256 = 25.6 ≈ 26 items
343 for _ in 0..49 {
344 assert_eq!(adaptive.observe(26, 256), None);
345 }
346
347 // On 50th observation, should trigger adjustment
348 let result = adaptive.observe(26, 256);
349 assert!(result.is_some());
350
351 let new_size = result.unwrap();
352 assert!(new_size > original_size, "Should increase on low occupancy");
353 assert!(new_size <= 1024, "Should respect max bound");
354 }
355
356 #[test]
357 fn test_respects_min_bound() {
358 let mut adaptive = AdaptiveChunking::new();
359
360 // Simulate very high occupancy repeatedly
361 for iteration in 0..20 {
362 // Reset measurements every iteration to allow adjustments
363 for _ in 0..50 {
364 adaptive.observe(250, 256);
365 }
366 adaptive.observe(250, 256);
367
368 // Verify we never go below minimum
369 assert!(
370 adaptive.current_size() >= 16,
371 "Iteration {}: size {} < min",
372 iteration,
373 adaptive.current_size()
374 );
375 }
376 }
377
378 #[test]
379 fn test_respects_max_bound() {
380 let mut adaptive = AdaptiveChunking::new();
381
382 // Simulate very low occupancy repeatedly
383 for iteration in 0..20 {
384 // Reset measurements every iteration to allow adjustments
385 for _ in 0..50 {
386 adaptive.observe(10, 256);
387 }
388 adaptive.observe(10, 256);
389
390 // Verify we never go above maximum
391 assert!(
392 adaptive.current_size() <= 1024,
393 "Iteration {}: size {} > max",
394 iteration,
395 adaptive.current_size()
396 );
397 }
398 }
399
400 #[test]
401 fn test_respects_min_adjustment_interval() {
402 let mut adaptive = AdaptiveChunking::new();
403
404 // Fill window with high occupancy (>80%) and trigger first adjustment
405 // 230/256 ≈ 89.8%
406 // Make 49 calls so window is not yet full
407 for _ in 0..49 {
408 let result = adaptive.observe(230, 256);
409 assert_eq!(result, None, "Should not adjust yet, window not full");
410 }
411
412 // 50th call: window becomes full, should trigger adjustment
413 let first_adjustment = adaptive.observe(230, 256);
414 assert!(
415 first_adjustment.is_some(),
416 "Should adjust on 50th observation when window is full"
417 );
418
419 let first_size = adaptive.current_size();
420 assert!(
421 first_size < 256,
422 "High occupancy should decrease chunk size"
423 );
424
425 // Immediately try to trigger another adjustment within 1 second
426 // This should NOT happen because of the 1-second minimum interval
427 // Build up a new window with different occupancy, still shouldn't trigger
428 for _ in 0..50 {
429 let result = adaptive.observe(230, 256);
430 assert_eq!(
431 result, None,
432 "Should not adjust again so soon (within min interval)"
433 );
434 }
435
436 // Should not adjust again immediately, even though window is full again
437 assert_eq!(
438 adaptive.current_size(),
439 first_size,
440 "Size should remain unchanged due to rate limiting"
441 );
442 }
443
444 #[test]
445 fn test_window_resets_after_adjustment() {
446 let mut adaptive = AdaptiveChunking::new();
447
448 // First window: high occupancy triggers decrease
449 // 230/256 ≈ 89.8%
450 // Make 49 calls to fill window to size 49
451 for _ in 0..49 {
452 let result = adaptive.observe(230, 256);
453 assert_eq!(result, None, "Should not adjust yet, window not full");
454 }
455
456 // 50th call: window becomes full, triggers adjustment
457 let first = adaptive.observe(230, 256);
458 assert!(
459 first.is_some(),
460 "Should adjust when window reaches 50 observations"
461 );
462
463 // Measurements should be cleared after adjustment
464 assert!(
465 adaptive.measurements.is_empty(),
466 "Measurements should be cleared after adjustment"
467 );
468 }
469
470 #[test]
471 fn test_zero_capacity_handling() {
472 let mut adaptive = AdaptiveChunking::new();
473
474 // Zero capacity edge case: percentage = 0
475 // 0% occupancy is OUTSIDE hysteresis band (< 20%), so it WILL increase chunk size
476 // This makes sense: consumer is draining instantly, we can send bigger batches
477 // Make 49 calls so window is not yet full (size 49 < 50)
478 for _ in 0..49 {
479 let result = adaptive.observe(0, 0);
480 // Should not adjust until window is full (50 observations)
481 assert_eq!(result, None, "Should not adjust until window is full");
482 }
483
484 // On the 50th observation, window becomes full
485 // We should trigger an increase because occupancy < 20%
486 let result = adaptive.observe(0, 0);
487 assert!(
488 result.is_some(),
489 "Should increase chunk size when occupancy < 20% and window is full"
490 );
491 assert!(
492 adaptive.current_size() > 256,
493 "Should increase from 256 due to low occupancy"
494 );
495 }
496
497 #[test]
498 fn test_average_occupancy_calculation() {
499 let mut adaptive = AdaptiveChunking::new();
500
501 // Add measurements: 10%, 20%, 30%, 40%, 50%
502 // Calculate actual item counts: 25.6, 51.2, 76.8, 102.4, 128
503 // Which truncate to: 25, 51, 76, 102, 128
504 // And percentages: (25*100)/256=9, (51*100)/256=19, (76*100)/256=29, (102*100)/256=39, (128*100)/256=50
505 for pct in [10, 20, 30, 40, 50].iter() {
506 let items = (pct * 256) / 100;
507 adaptive.observe(items, 256);
508 }
509
510 let avg = adaptive.average_occupancy();
511 // Average of [9, 19, 29, 39, 50] = 146 / 5 = 29 (integer division)
512 assert_eq!(
513 avg, 29,
514 "Average should account for integer division in percentages"
515 );
516 }
517}