Skip to main content

fraiseql_wire/stream/
adaptive_chunking.rs

1//! Adaptive chunk sizing based on channel occupancy patterns
2//!
3//! This module implements self-tuning chunk sizes that automatically adjust batch sizes
4//! based on observed backpressure (channel occupancy).
5//!
6//! **Critical Semantics**:
7//! `chunk_size` controls **both**:
8//! 1. MPSC channel capacity (backpressure buffer)
9//! 2. Batch size for Postgres row parsing
10//!
11//! **Control Signal Interpretation**:
12//! - **High occupancy** (>80%): Producer waiting on channel capacity, consumer slow
13//!   → **Reduce chunk_size**: smaller batches reduce pressure, lower latency per item
14//!
15//! - **Low occupancy** (<20%): Consumer faster than producer, frequent context switches
16//!   → **Increase chunk_size**: larger batches amortize parsing cost, less frequent wakeups
17//!
18//! **Design Principles**:
19//! - Measurement-based adjustment (50-item window) for stability
20//! - Hysteresis band (20%-80%) prevents frequent oscillation
21//! - Minimum adjustment interval (1 second) prevents thrashing
22//! - Conservative bounds (16-1024) prevent pathological extremes
23//! - Clear window reset after adjustment (fresh observations)
24
25use std::collections::VecDeque;
26use std::time::{Duration, Instant};
27
28/// Single observation of channel occupancy
29#[derive(Copy, Clone, Debug)]
30struct Occupancy {
31    /// Percentage of channel capacity in use (0-100)
32    percentage: usize,
33}
34
35/// Tracks channel occupancy and automatically adjusts chunk size based on backpressure
36///
37/// # Examples
38///
39/// ```ignore
40/// let mut adaptive = AdaptiveChunking::new();
41///
42/// // Periodically observe channel occupancy
43/// for chunk_sent in 0..100 {
44///     let occupancy_pct = (buffered_items * 100) / channel_capacity;
45///     if let Some(new_size) = adaptive.observe(buffered_items, channel_capacity) {
46///         println!("Adjusted chunk size: {} -> {}", adaptive.current_size() - new_size, new_size);
47///     }
48/// }
49/// ```
50pub struct AdaptiveChunking {
51    /// Current chunk size (mutable, adjusted over time)
52    current_size: usize,
53
54    /// Absolute minimum chunk size (never decrease below this)
55    min_size: usize,
56
57    /// Absolute maximum chunk size (never increase beyond this)
58    max_size: usize,
59
60    /// Number of measurements to collect before making adjustment decision
61    adjustment_window: usize,
62
63    /// Rolling window of recent occupancy observations
64    measurements: VecDeque<Occupancy>,
65
66    /// Timestamp of last chunk size adjustment (for rate limiting)
67    last_adjustment_time: Option<Instant>,
68
69    /// Minimum time between adjustments (prevents thrashing/oscillation)
70    min_adjustment_interval: Duration,
71}
72
73impl AdaptiveChunking {
74    /// Create a new adaptive chunking controller with default bounds
75    ///
76    /// **Defaults**:
77    /// - Initial chunk size: 256 items
78    /// - Min size: 16 items
79    /// - Max size: 1024 items
80    /// - Adjustment window: 50 observations
81    /// - Min adjustment interval: 1 second
82    ///
83    /// # Examples
84    ///
85    /// ```ignore
86    /// let adaptive = AdaptiveChunking::new();
87    /// assert_eq!(adaptive.current_size(), 256);
88    /// ```
89    pub fn new() -> Self {
90        Self {
91            current_size: 256,
92            min_size: 16,
93            max_size: 1024,
94            adjustment_window: 50,
95            measurements: VecDeque::with_capacity(50),
96            last_adjustment_time: None,
97            min_adjustment_interval: Duration::from_secs(1),
98        }
99    }
100
101    /// Record an occupancy observation and check if chunk size adjustment is warranted
102    ///
103    /// Call this method after each chunk is sent to the channel.
104    /// Returns `Some(new_size)` if an adjustment should be applied, `None` otherwise.
105    ///
106    /// # Arguments
107    ///
108    /// * `items_buffered` - Number of items currently in the channel
109    /// * `capacity` - Total capacity of the channel (usually equal to chunk_size)
110    ///
111    /// # Examples
112    ///
113    /// ```ignore
114    /// let mut adaptive = AdaptiveChunking::new();
115    ///
116    /// // Simulate high occupancy (90%)
117    /// for _ in 0..50 {
118    ///     adaptive.observe(230, 256);  // ~90% occupancy
119    /// }
120    ///
121    /// // On the 51st observation, should trigger adjustment
122    /// if let Some(new_size) = adaptive.observe(230, 256) {
123    ///     println!("Adjusted to {}", new_size);  // Will be < 256
124    /// }
125    /// ```
126    pub fn observe(&mut self, items_buffered: usize, capacity: usize) -> Option<usize> {
127        // Calculate occupancy percentage (clamped at 100% if buffer exceeds capacity)
128        // Special case: if capacity is 0, treat occupancy as 0% (consumer draining instantly)
129        let pct = if capacity == 0 {
130            0
131        } else {
132            (items_buffered * 100)
133                .checked_div(capacity)
134                .unwrap_or(100)
135                .min(100)
136        };
137
138        // Record this observation
139        self.measurements.push_back(Occupancy { percentage: pct });
140
141        // Keep only the most recent measurements in the window
142        while self.measurements.len() > self.adjustment_window {
143            self.measurements.pop_front();
144        }
145
146        // Only consider adjustment if we have a FULL window of observations
147        // (i.e., exactly equal to the window size, not more)
148        // This ensures we only evaluate after collecting N measurements
149        if self.measurements.len() == self.adjustment_window && self.should_adjust() {
150            return self.calculate_adjustment();
151        }
152
153        None
154    }
155
156    /// Get the current chunk size
157    ///
158    /// # Examples
159    ///
160    /// ```ignore
161    /// let adaptive = AdaptiveChunking::new();
162    /// assert_eq!(adaptive.current_size(), 256);
163    /// ```
164    pub fn current_size(&self) -> usize {
165        self.current_size
166    }
167
168    /// Set custom min/max bounds for chunk size adjustments
169    ///
170    /// Allows overriding the default bounds (16-1024) with custom limits.
171    /// The current chunk size will be clamped to the new bounds.
172    ///
173    /// # Arguments
174    ///
175    /// * `min_size` - Minimum chunk size (must be > 0)
176    /// * `max_size` - Maximum chunk size (must be >= min_size)
177    ///
178    /// # Examples
179    ///
180    /// ```ignore
181    /// let mut adaptive = AdaptiveChunking::new();
182    /// adaptive = adaptive.with_bounds(32, 512);  // Custom range 32-512
183    /// assert!(adaptive.current_size() >= 32);
184    /// assert!(adaptive.current_size() <= 512);
185    /// ```
186    pub fn with_bounds(mut self, min_size: usize, max_size: usize) -> Self {
187        // Basic validation
188        if min_size == 0 || max_size < min_size {
189            tracing::warn!(
190                "invalid chunk bounds: min={}, max={}, keeping defaults",
191                min_size,
192                max_size
193            );
194            return self;
195        }
196
197        self.min_size = min_size;
198        self.max_size = max_size;
199
200        // Clamp current size to new bounds
201        if self.current_size < min_size {
202            self.current_size = min_size;
203        } else if self.current_size > max_size {
204            self.current_size = max_size;
205        }
206
207        tracing::debug!(
208            "adaptive chunking bounds set: min={}, max={}, current={}",
209            self.min_size,
210            self.max_size,
211            self.current_size
212        );
213
214        self
215    }
216
217    /// Calculate average occupancy percentage over the measurement window
218    fn average_occupancy(&self) -> usize {
219        if self.measurements.is_empty() {
220            return 0;
221        }
222
223        let sum: usize = self.measurements.iter().map(|m| m.percentage).sum();
224        sum / self.measurements.len()
225    }
226
227    /// Check if adjustment conditions are met
228    ///
229    /// Adjustment is only considered if:
230    /// 1. At least 1 second has elapsed since the last adjustment
231    /// 2. Average occupancy is outside the hysteresis band (< 20% or > 80%)
232    fn should_adjust(&self) -> bool {
233        // Rate limit: don't adjust too frequently
234        if let Some(last_adj) = self.last_adjustment_time {
235            if last_adj.elapsed() < self.min_adjustment_interval {
236                return false;
237            }
238        }
239
240        // Hysteresis: only adjust if we're clearly outside the comfort zone
241        let avg = self.average_occupancy();
242        !(20..=80).contains(&avg)
243    }
244
245    /// Calculate the new chunk size based on average occupancy
246    ///
247    /// **Logic**:
248    /// - If avg > 80%: **DECREASE** by factor of 1.5 (high occupancy = producer backed up)
249    /// - If avg < 20%: **INCREASE** by factor of 1.5 (low occupancy = consumer fast)
250    /// - Clamps to [min_size, max_size]
251    /// - Clears measurements after adjustment
252    ///
253    /// Returns `Some(new_size)` if size actually changed, `None` if no change needed.
254    fn calculate_adjustment(&mut self) -> Option<usize> {
255        let avg = self.average_occupancy();
256        let old_size = self.current_size;
257
258        let new_size = if avg > 80 {
259            // High occupancy: producer is waiting on channel, consumer is slow
260            // → DECREASE chunk_size to reduce backpressure and latency
261            ((self.current_size as f64 / 1.5).floor() as usize).max(self.min_size)
262        } else if avg < 20 {
263            // Low occupancy: consumer is draining fast, producer could batch more
264            // → INCREASE chunk_size to amortize parsing cost and reduce context switches
265            ((self.current_size as f64 * 1.5).ceil() as usize).min(self.max_size)
266        } else {
267            old_size
268        };
269
270        // Only return if there was an actual change
271        if new_size != old_size {
272            self.current_size = new_size;
273            self.last_adjustment_time = Some(Instant::now());
274            self.measurements.clear(); // Reset window for fresh observations
275            Some(new_size)
276        } else {
277            None
278        }
279    }
280}
281
282impl Default for AdaptiveChunking {
283    fn default() -> Self {
284        Self::new()
285    }
286}
287
288#[cfg(test)]
289mod tests {
290    use super::*;
291
292    #[test]
293    fn test_new_defaults() {
294        let adaptive = AdaptiveChunking::new();
295        assert_eq!(adaptive.current_size(), 256);
296        assert_eq!(adaptive.min_size, 16);
297        assert_eq!(adaptive.max_size, 1024);
298        assert_eq!(adaptive.adjustment_window, 50);
299        assert!(adaptive.last_adjustment_time.is_none());
300        assert!(adaptive.measurements.is_empty());
301    }
302
303    #[test]
304    fn test_no_adjustment_in_hysteresis_band() {
305        let mut adaptive = AdaptiveChunking::new();
306
307        // Simulate 50% occupancy (inside 20-80% hysteresis band)
308        // 50% of 256 = 128 items
309        for _ in 0..50 {
310            assert_eq!(adaptive.observe(128, 256), None);
311        }
312
313        // Should not adjust - still at 256
314        assert_eq!(adaptive.current_size(), 256);
315    }
316
317    #[test]
318    fn test_decrease_on_high_occupancy() {
319        let mut adaptive = AdaptiveChunking::new();
320        let original_size = 256;
321
322        // Simulate 90% occupancy (producer backed up, consumer slow)
323        // 90% of 256 = 230.4 ≈ 230 items
324        for _ in 0..49 {
325            assert_eq!(adaptive.observe(230, 256), None);
326        }
327
328        // On 50th observation, should trigger adjustment
329        let result = adaptive.observe(230, 256);
330        assert!(result.is_some());
331
332        let new_size = result.unwrap();
333        assert!(
334            new_size < original_size,
335            "Should decrease on high occupancy"
336        );
337        assert!(new_size >= 16, "Should respect min bound");
338    }
339
340    #[test]
341    fn test_increase_on_low_occupancy() {
342        let mut adaptive = AdaptiveChunking::new();
343        let original_size = 256;
344
345        // Simulate 10% occupancy (consumer fast, producer lagging)
346        // 10% of 256 = 25.6 ≈ 26 items
347        for _ in 0..49 {
348            assert_eq!(adaptive.observe(26, 256), None);
349        }
350
351        // On 50th observation, should trigger adjustment
352        let result = adaptive.observe(26, 256);
353        assert!(result.is_some());
354
355        let new_size = result.unwrap();
356        assert!(new_size > original_size, "Should increase on low occupancy");
357        assert!(new_size <= 1024, "Should respect max bound");
358    }
359
360    #[test]
361    fn test_respects_min_bound() {
362        let mut adaptive = AdaptiveChunking::new();
363
364        // Simulate very high occupancy repeatedly
365        for iteration in 0..20 {
366            // Reset measurements every iteration to allow adjustments
367            for _ in 0..50 {
368                adaptive.observe(250, 256);
369            }
370            adaptive.observe(250, 256);
371
372            // Verify we never go below minimum
373            assert!(
374                adaptive.current_size() >= 16,
375                "Iteration {}: size {} < min",
376                iteration,
377                adaptive.current_size()
378            );
379        }
380    }
381
382    #[test]
383    fn test_respects_max_bound() {
384        let mut adaptive = AdaptiveChunking::new();
385
386        // Simulate very low occupancy repeatedly
387        for iteration in 0..20 {
388            // Reset measurements every iteration to allow adjustments
389            for _ in 0..50 {
390                adaptive.observe(10, 256);
391            }
392            adaptive.observe(10, 256);
393
394            // Verify we never go above maximum
395            assert!(
396                adaptive.current_size() <= 1024,
397                "Iteration {}: size {} > max",
398                iteration,
399                adaptive.current_size()
400            );
401        }
402    }
403
404    #[test]
405    fn test_respects_min_adjustment_interval() {
406        let mut adaptive = AdaptiveChunking::new();
407
408        // Fill window with high occupancy (>80%) and trigger first adjustment
409        // 230/256 ≈ 89.8%
410        // Make 49 calls so window is not yet full
411        for _ in 0..49 {
412            let result = adaptive.observe(230, 256);
413            assert_eq!(result, None, "Should not adjust yet, window not full");
414        }
415
416        // 50th call: window becomes full, should trigger adjustment
417        let first_adjustment = adaptive.observe(230, 256);
418        assert!(
419            first_adjustment.is_some(),
420            "Should adjust on 50th observation when window is full"
421        );
422
423        let first_size = adaptive.current_size();
424        assert!(
425            first_size < 256,
426            "High occupancy should decrease chunk size"
427        );
428
429        // Immediately try to trigger another adjustment within 1 second
430        // This should NOT happen because of the 1-second minimum interval
431        // Build up a new window with different occupancy, still shouldn't trigger
432        for _ in 0..50 {
433            let result = adaptive.observe(230, 256);
434            assert_eq!(
435                result, None,
436                "Should not adjust again so soon (within min interval)"
437            );
438        }
439
440        // Should not adjust again immediately, even though window is full again
441        assert_eq!(
442            adaptive.current_size(),
443            first_size,
444            "Size should remain unchanged due to rate limiting"
445        );
446    }
447
448    #[test]
449    fn test_window_resets_after_adjustment() {
450        let mut adaptive = AdaptiveChunking::new();
451
452        // First window: high occupancy triggers decrease
453        // 230/256 ≈ 89.8%
454        // Make 49 calls to fill window to size 49
455        for _ in 0..49 {
456            let result = adaptive.observe(230, 256);
457            assert_eq!(result, None, "Should not adjust yet, window not full");
458        }
459
460        // 50th call: window becomes full, triggers adjustment
461        let first = adaptive.observe(230, 256);
462        assert!(
463            first.is_some(),
464            "Should adjust when window reaches 50 observations"
465        );
466
467        // Measurements should be cleared after adjustment
468        assert!(
469            adaptive.measurements.is_empty(),
470            "Measurements should be cleared after adjustment"
471        );
472    }
473
474    #[test]
475    fn test_zero_capacity_handling() {
476        let mut adaptive = AdaptiveChunking::new();
477
478        // Zero capacity edge case: percentage = 0
479        // 0% occupancy is OUTSIDE hysteresis band (< 20%), so it WILL increase chunk size
480        // This makes sense: consumer is draining instantly, we can send bigger batches
481        // Make 49 calls so window is not yet full (size 49 < 50)
482        for _ in 0..49 {
483            let result = adaptive.observe(0, 0);
484            // Should not adjust until window is full (50 observations)
485            assert_eq!(result, None, "Should not adjust until window is full");
486        }
487
488        // On the 50th observation, window becomes full
489        // We should trigger an increase because occupancy < 20%
490        let result = adaptive.observe(0, 0);
491        assert!(
492            result.is_some(),
493            "Should increase chunk size when occupancy < 20% and window is full"
494        );
495        assert!(
496            adaptive.current_size() > 256,
497            "Should increase from 256 due to low occupancy"
498        );
499    }
500
501    #[test]
502    fn test_average_occupancy_calculation() {
503        let mut adaptive = AdaptiveChunking::new();
504
505        // Add measurements: 10%, 20%, 30%, 40%, 50%
506        // Calculate actual item counts: 25.6, 51.2, 76.8, 102.4, 128
507        // Which truncate to: 25, 51, 76, 102, 128
508        // And percentages: (25*100)/256=9, (51*100)/256=19, (76*100)/256=29, (102*100)/256=39, (128*100)/256=50
509        for pct in [10, 20, 30, 40, 50].iter() {
510            let items = (pct * 256) / 100;
511            adaptive.observe(items, 256);
512        }
513
514        let avg = adaptive.average_occupancy();
515        // Average of [9, 19, 29, 39, 50] = 146 / 5 = 29 (integer division)
516        assert_eq!(
517            avg, 29,
518            "Average should account for integer division in percentages"
519        );
520    }
521}