Skip to main content

fraiseql_wire/stream/
adaptive_chunking.rs

1//! Adaptive chunk sizing based on channel occupancy patterns
2//!
3//! This module implements self-tuning chunk sizes that automatically adjust batch sizes
4//! based on observed backpressure (channel occupancy).
5//!
6//! **Critical Semantics**:
7//! `chunk_size` controls **both**:
8//! 1. MPSC channel capacity (backpressure buffer)
9//! 2. Batch size for Postgres row parsing
10//!
11//! **Control Signal Interpretation**:
12//! - **High occupancy** (>80%): Producer waiting on channel capacity, consumer slow
13//!   → **Reduce `chunk_size`**: smaller batches reduce pressure, lower latency per item
14//!
15//! - **Low occupancy** (<20%): Consumer faster than producer, frequent context switches
16//!   → **Increase `chunk_size`**: larger batches amortize parsing cost, less frequent wakeups
17//!
18//! **Design Principles**:
19//! - Measurement-based adjustment (50-item window) for stability
20//! - Hysteresis band (20%-80%) prevents frequent oscillation
21//! - Minimum adjustment interval (1 second) prevents thrashing
22//! - Conservative bounds (16-1024) prevent pathological extremes
23//! - Clear window reset after adjustment (fresh observations)
24
25use std::collections::VecDeque;
26use std::time::{Duration, Instant};
27
28/// Single observation of channel occupancy
29#[derive(Copy, Clone, Debug)]
30struct Occupancy {
31    /// Percentage of channel capacity in use (0-100)
32    percentage: usize,
33}
34
35/// Tracks channel occupancy and automatically adjusts chunk size based on backpressure
36///
37/// # Examples
38///
39/// ```rust
40/// use fraiseql_wire::stream::AdaptiveChunking;
41/// let mut adaptive = AdaptiveChunking::new();
42/// let (buffered_items, channel_capacity) = (50usize, 256usize);
43///
44/// // Periodically observe channel occupancy
45/// for _chunk_sent in 0..100 {
46///     if let Some(new_size) = adaptive.observe(buffered_items, channel_capacity) {
47///         println!("Adjusted chunk size to {}", new_size);
48///     }
49/// }
50/// ```
51pub struct AdaptiveChunking {
52    /// Current chunk size (mutable, adjusted over time)
53    current_size: usize,
54
55    /// Absolute minimum chunk size (never decrease below this)
56    min_size: usize,
57
58    /// Absolute maximum chunk size (never increase beyond this)
59    max_size: usize,
60
61    /// Number of measurements to collect before making adjustment decision
62    adjustment_window: usize,
63
64    /// Rolling window of recent occupancy observations
65    measurements: VecDeque<Occupancy>,
66
67    /// Timestamp of last chunk size adjustment (for rate limiting)
68    last_adjustment_time: Option<Instant>,
69
70    /// Minimum time between adjustments (prevents thrashing/oscillation)
71    min_adjustment_interval: Duration,
72}
73
74impl AdaptiveChunking {
75    /// Create a new adaptive chunking controller with default bounds
76    ///
77    /// **Defaults**:
78    /// - Initial chunk size: 256 items
79    /// - Min size: 16 items
80    /// - Max size: 1024 items
81    /// - Adjustment window: 50 observations
82    /// - Min adjustment interval: 1 second
83    ///
84    /// # Examples
85    ///
86    /// ```rust
87    /// use fraiseql_wire::stream::AdaptiveChunking;
88    /// let adaptive = AdaptiveChunking::new();
89    /// assert_eq!(adaptive.current_size(), 256);
90    /// ```
91    pub fn new() -> Self {
92        Self {
93            current_size: 256,
94            min_size: 16,
95            max_size: 1024,
96            adjustment_window: 50,
97            measurements: VecDeque::with_capacity(50),
98            last_adjustment_time: None,
99            min_adjustment_interval: Duration::from_secs(1),
100        }
101    }
102
103    /// Record an occupancy observation and check if chunk size adjustment is warranted
104    ///
105    /// Call this method after each chunk is sent to the channel.
106    /// Returns `Some(new_size)` if an adjustment should be applied, `None` otherwise.
107    ///
108    /// # Arguments
109    ///
110    /// * `items_buffered` - Number of items currently in the channel
111    /// * `capacity` - Total capacity of the channel (usually equal to `chunk_size`)
112    ///
113    /// # Examples
114    ///
115    /// ```rust
116    /// use fraiseql_wire::stream::AdaptiveChunking;
117    /// let mut adaptive = AdaptiveChunking::new();
118    ///
119    /// // Simulate high occupancy (90%)
120    /// for _ in 0..50 {
121    ///     adaptive.observe(230, 256);  // ~90% occupancy
122    /// }
123    ///
124    /// // On the 51st observation, should trigger adjustment
125    /// if let Some(new_size) = adaptive.observe(230, 256) {
126    ///     println!("Adjusted to {}", new_size);  // Will be < 256
127    /// }
128    /// ```
129    pub fn observe(&mut self, items_buffered: usize, capacity: usize) -> Option<usize> {
130        // Calculate occupancy percentage (clamped at 100% if buffer exceeds capacity)
131        // Special case: if capacity is 0, treat occupancy as 0% (consumer draining instantly)
132        let pct = if capacity == 0 {
133            0
134        } else {
135            (items_buffered * 100)
136                .checked_div(capacity)
137                .unwrap_or(100)
138                .min(100)
139        };
140
141        // Record this observation
142        self.measurements.push_back(Occupancy { percentage: pct });
143
144        // Keep only the most recent measurements in the window
145        while self.measurements.len() > self.adjustment_window {
146            self.measurements.pop_front();
147        }
148
149        // Only consider adjustment if we have a FULL window of observations
150        // (i.e., exactly equal to the window size, not more)
151        // This ensures we only evaluate after collecting N measurements
152        if self.measurements.len() == self.adjustment_window && self.should_adjust() {
153            return self.calculate_adjustment();
154        }
155
156        None
157    }
158
159    /// Get the current chunk size
160    ///
161    /// # Examples
162    ///
163    /// ```rust
164    /// use fraiseql_wire::stream::AdaptiveChunking;
165    /// let adaptive = AdaptiveChunking::new();
166    /// assert_eq!(adaptive.current_size(), 256);
167    /// ```
168    pub const fn current_size(&self) -> usize {
169        self.current_size
170    }
171
172    /// Set custom min/max bounds for chunk size adjustments
173    ///
174    /// Allows overriding the default bounds (16-1024) with custom limits.
175    /// The current chunk size will be clamped to the new bounds.
176    ///
177    /// # Arguments
178    ///
179    /// * `min_size` - Minimum chunk size (must be > 0)
180    /// * `max_size` - Maximum chunk size (must be >= `min_size`)
181    ///
182    /// # Examples
183    ///
184    /// ```rust
185    /// use fraiseql_wire::stream::AdaptiveChunking;
186    /// let mut adaptive = AdaptiveChunking::new();
187    /// adaptive = adaptive.with_bounds(32, 512);  // Custom range 32-512
188    /// assert!(adaptive.current_size() >= 32);
189    /// assert!(adaptive.current_size() <= 512);
190    /// ```
191    pub fn with_bounds(mut self, min_size: usize, max_size: usize) -> Self {
192        // Basic validation
193        if min_size == 0 || max_size < min_size {
194            tracing::warn!(
195                "invalid chunk bounds: min={}, max={}, keeping defaults",
196                min_size,
197                max_size
198            );
199            return self;
200        }
201
202        self.min_size = min_size;
203        self.max_size = max_size;
204
205        // Clamp current size to new bounds
206        if self.current_size < min_size {
207            self.current_size = min_size;
208        } else if self.current_size > max_size {
209            self.current_size = max_size;
210        }
211
212        tracing::debug!(
213            "adaptive chunking bounds set: min={}, max={}, current={}",
214            self.min_size,
215            self.max_size,
216            self.current_size
217        );
218
219        self
220    }
221
222    /// Calculate average occupancy percentage over the measurement window
223    fn average_occupancy(&self) -> usize {
224        if self.measurements.is_empty() {
225            return 0;
226        }
227
228        let sum: usize = self.measurements.iter().map(|m| m.percentage).sum();
229        sum / self.measurements.len()
230    }
231
232    /// Check if adjustment conditions are met
233    ///
234    /// Adjustment is only considered if:
235    /// 1. At least 1 second has elapsed since the last adjustment
236    /// 2. Average occupancy is outside the hysteresis band (< 20% or > 80%)
237    fn should_adjust(&self) -> bool {
238        // Rate limit: don't adjust too frequently
239        if let Some(last_adj) = self.last_adjustment_time {
240            if last_adj.elapsed() < self.min_adjustment_interval {
241                return false;
242            }
243        }
244
245        // Hysteresis: only adjust if we're clearly outside the comfort zone
246        let avg = self.average_occupancy();
247        !(20..=80).contains(&avg)
248    }
249
250    /// Calculate the new chunk size based on average occupancy
251    ///
252    /// **Logic**:
253    /// - If avg > 80%: **DECREASE** by factor of 1.5 (high occupancy = producer backed up)
254    /// - If avg < 20%: **INCREASE** by factor of 1.5 (low occupancy = consumer fast)
255    /// - Clamps to [`min_size`, `max_size`]
256    /// - Clears measurements after adjustment
257    ///
258    /// Returns `Some(new_size)` if size actually changed, `None` if no change needed.
259    fn calculate_adjustment(&mut self) -> Option<usize> {
260        let avg = self.average_occupancy();
261        let old_size = self.current_size;
262
263        let new_size = if avg > 80 {
264            // High occupancy: producer is waiting on channel, consumer is slow
265            // → DECREASE chunk_size to reduce backpressure and latency
266            ((self.current_size as f64 / 1.5).floor() as usize).max(self.min_size)
267        } else if avg < 20 {
268            // Low occupancy: consumer is draining fast, producer could batch more
269            // → INCREASE chunk_size to amortize parsing cost and reduce context switches
270            ((self.current_size as f64 * 1.5).ceil() as usize).min(self.max_size)
271        } else {
272            old_size
273        };
274
275        // Only return if there was an actual change
276        if new_size != old_size {
277            self.current_size = new_size;
278            self.last_adjustment_time = Some(Instant::now());
279            self.measurements.clear(); // Reset window for fresh observations
280            Some(new_size)
281        } else {
282            None
283        }
284    }
285}
286
287impl Default for AdaptiveChunking {
288    fn default() -> Self {
289        Self::new()
290    }
291}
292
293#[cfg(test)]
294mod tests {
295    #![allow(clippy::unwrap_used)] // Reason: test code, panics are acceptable
296    use super::*;
297
298    #[test]
299    fn test_new_defaults() {
300        let adaptive = AdaptiveChunking::new();
301        assert_eq!(adaptive.current_size(), 256);
302        assert_eq!(adaptive.min_size, 16);
303        assert_eq!(adaptive.max_size, 1024);
304        assert_eq!(adaptive.adjustment_window, 50);
305        assert!(adaptive.last_adjustment_time.is_none());
306        assert!(adaptive.measurements.is_empty());
307    }
308
309    #[test]
310    fn test_no_adjustment_in_hysteresis_band() {
311        let mut adaptive = AdaptiveChunking::new();
312
313        // Simulate 50% occupancy (inside 20-80% hysteresis band)
314        // 50% of 256 = 128 items
315        for _ in 0..50 {
316            assert_eq!(adaptive.observe(128, 256), None);
317        }
318
319        // Should not adjust - still at 256
320        assert_eq!(adaptive.current_size(), 256);
321    }
322
323    #[test]
324    fn test_decrease_on_high_occupancy() {
325        let mut adaptive = AdaptiveChunking::new();
326        let original_size = 256;
327
328        // Simulate 90% occupancy (producer backed up, consumer slow)
329        // 90% of 256 = 230.4 ≈ 230 items
330        for _ in 0..49 {
331            assert_eq!(adaptive.observe(230, 256), None);
332        }
333
334        // On 50th observation, should trigger adjustment
335        let result = adaptive.observe(230, 256);
336        assert!(result.is_some());
337
338        let new_size = result.unwrap();
339        assert!(
340            new_size < original_size,
341            "Should decrease on high occupancy"
342        );
343        assert!(new_size >= 16, "Should respect min bound");
344    }
345
346    #[test]
347    fn test_increase_on_low_occupancy() {
348        let mut adaptive = AdaptiveChunking::new();
349        let original_size = 256;
350
351        // Simulate 10% occupancy (consumer fast, producer lagging)
352        // 10% of 256 = 25.6 ≈ 26 items
353        for _ in 0..49 {
354            assert_eq!(adaptive.observe(26, 256), None);
355        }
356
357        // On 50th observation, should trigger adjustment
358        let result = adaptive.observe(26, 256);
359        assert!(result.is_some());
360
361        let new_size = result.unwrap();
362        assert!(new_size > original_size, "Should increase on low occupancy");
363        assert!(new_size <= 1024, "Should respect max bound");
364    }
365
366    #[test]
367    fn test_respects_min_bound() {
368        let mut adaptive = AdaptiveChunking::new();
369
370        // Simulate very high occupancy repeatedly
371        for iteration in 0..20 {
372            // Reset measurements every iteration to allow adjustments
373            for _ in 0..50 {
374                adaptive.observe(250, 256);
375            }
376            adaptive.observe(250, 256);
377
378            // Verify we never go below minimum
379            assert!(
380                adaptive.current_size() >= 16,
381                "Iteration {}: size {} < min",
382                iteration,
383                adaptive.current_size()
384            );
385        }
386    }
387
388    #[test]
389    fn test_respects_max_bound() {
390        let mut adaptive = AdaptiveChunking::new();
391
392        // Simulate very low occupancy repeatedly
393        for iteration in 0..20 {
394            // Reset measurements every iteration to allow adjustments
395            for _ in 0..50 {
396                adaptive.observe(10, 256);
397            }
398            adaptive.observe(10, 256);
399
400            // Verify we never go above maximum
401            assert!(
402                adaptive.current_size() <= 1024,
403                "Iteration {}: size {} > max",
404                iteration,
405                adaptive.current_size()
406            );
407        }
408    }
409
410    #[test]
411    fn test_respects_min_adjustment_interval() {
412        let mut adaptive = AdaptiveChunking::new();
413
414        // Fill window with high occupancy (>80%) and trigger first adjustment
415        // 230/256 ≈ 89.8%
416        // Make 49 calls so window is not yet full
417        for _ in 0..49 {
418            let result = adaptive.observe(230, 256);
419            assert_eq!(result, None, "Should not adjust yet, window not full");
420        }
421
422        // 50th call: window becomes full, should trigger adjustment
423        let first_adjustment = adaptive.observe(230, 256);
424        assert!(
425            first_adjustment.is_some(),
426            "Should adjust on 50th observation when window is full"
427        );
428
429        let first_size = adaptive.current_size();
430        assert!(
431            first_size < 256,
432            "High occupancy should decrease chunk size"
433        );
434
435        // Immediately try to trigger another adjustment within 1 second
436        // This should NOT happen because of the 1-second minimum interval
437        // Build up a new window with different occupancy, still shouldn't trigger
438        for _ in 0..50 {
439            let result = adaptive.observe(230, 256);
440            assert_eq!(
441                result, None,
442                "Should not adjust again so soon (within min interval)"
443            );
444        }
445
446        // Should not adjust again immediately, even though window is full again
447        assert_eq!(
448            adaptive.current_size(),
449            first_size,
450            "Size should remain unchanged due to rate limiting"
451        );
452    }
453
454    #[test]
455    fn test_window_resets_after_adjustment() {
456        let mut adaptive = AdaptiveChunking::new();
457
458        // First window: high occupancy triggers decrease
459        // 230/256 ≈ 89.8%
460        // Make 49 calls to fill window to size 49
461        for _ in 0..49 {
462            let result = adaptive.observe(230, 256);
463            assert_eq!(result, None, "Should not adjust yet, window not full");
464        }
465
466        // 50th call: window becomes full, triggers adjustment
467        let first = adaptive.observe(230, 256);
468        assert!(
469            first.is_some(),
470            "Should adjust when window reaches 50 observations"
471        );
472
473        // Measurements should be cleared after adjustment
474        assert!(
475            adaptive.measurements.is_empty(),
476            "Measurements should be cleared after adjustment"
477        );
478    }
479
480    #[test]
481    fn test_zero_capacity_handling() {
482        let mut adaptive = AdaptiveChunking::new();
483
484        // Zero capacity edge case: percentage = 0
485        // 0% occupancy is OUTSIDE hysteresis band (< 20%), so it WILL increase chunk size
486        // This makes sense: consumer is draining instantly, we can send bigger batches
487        // Make 49 calls so window is not yet full (size 49 < 50)
488        for _ in 0..49 {
489            let result = adaptive.observe(0, 0);
490            // Should not adjust until window is full (50 observations)
491            assert_eq!(result, None, "Should not adjust until window is full");
492        }
493
494        // On the 50th observation, window becomes full
495        // We should trigger an increase because occupancy < 20%
496        let result = adaptive.observe(0, 0);
497        assert!(
498            result.is_some(),
499            "Should increase chunk size when occupancy < 20% and window is full"
500        );
501        assert!(
502            adaptive.current_size() > 256,
503            "Should increase from 256 due to low occupancy"
504        );
505    }
506
507    #[test]
508    fn test_average_occupancy_calculation() {
509        let mut adaptive = AdaptiveChunking::new();
510
511        // Add measurements: 10%, 20%, 30%, 40%, 50%
512        // Calculate actual item counts: 25.6, 51.2, 76.8, 102.4, 128
513        // Which truncate to: 25, 51, 76, 102, 128
514        // And percentages: (25*100)/256=9, (51*100)/256=19, (76*100)/256=29, (102*100)/256=39, (128*100)/256=50
515        for pct in [10, 20, 30, 40, 50].iter() {
516            let items = (pct * 256) / 100;
517            adaptive.observe(items, 256);
518        }
519
520        let avg = adaptive.average_occupancy();
521        // Average of [9, 19, 29, 39, 50] = 146 / 5 = 29 (integer division)
522        assert_eq!(
523            avg, 29,
524            "Average should account for integer division in percentages"
525        );
526    }
527}