Skip to main content

fraiseql_wire/stream/adaptive_chunking/
mod.rs

1//! Adaptive chunk sizing based on channel occupancy patterns
2//!
3//! This module implements self-tuning chunk sizes that automatically adjust batch sizes
4//! based on observed backpressure (channel occupancy).
5//!
6//! **Critical Semantics**:
7//! `chunk_size` controls **both**:
8//! 1. MPSC channel capacity (backpressure buffer)
9//! 2. Batch size for Postgres row parsing
10//!
11//! **Control Signal Interpretation**:
12//! - **High occupancy** (>80%): Producer waiting on channel capacity, consumer slow
13//!   → **Reduce `chunk_size`**: smaller batches reduce pressure, lower latency per item
14//!
15//! - **Low occupancy** (<20%): Consumer faster than producer, frequent context switches
16//!   → **Increase `chunk_size`**: larger batches amortize parsing cost, less frequent wakeups
17//!
18//! **Design Principles**:
19//! - Measurement-based adjustment (50-item window) for stability
20//! - Hysteresis band (20%-80%) prevents frequent oscillation
21//! - Minimum adjustment interval (1 second) prevents thrashing
22//! - Conservative bounds (16-1024) prevent pathological extremes
23//! - Clear window reset after adjustment (fresh observations)
24
25use std::collections::VecDeque;
26use std::time::{Duration, Instant};
27
28/// Single observation of channel occupancy
29#[derive(Copy, Clone, Debug)]
30pub struct Occupancy {
31    /// Percentage of channel capacity in use (0-100)
32    percentage: usize,
33}
34
35/// Tracks channel occupancy and automatically adjusts chunk size based on backpressure
36///
37/// # Examples
38///
39/// ```rust
40/// use fraiseql_wire::stream::AdaptiveChunking;
41/// let mut adaptive = AdaptiveChunking::new();
42/// let (buffered_items, channel_capacity) = (50usize, 256usize);
43///
44/// // Periodically observe channel occupancy
45/// for _chunk_sent in 0..100 {
46///     if let Some(new_size) = adaptive.observe(buffered_items, channel_capacity) {
47///         println!("Adjusted chunk size to {}", new_size);
48///     }
49/// }
50/// ```
51pub struct AdaptiveChunking {
52    /// Current chunk size (mutable, adjusted over time)
53    current_size: usize,
54
55    /// Absolute minimum chunk size (never decrease below this)
56    pub min_size: usize,
57
58    /// Absolute maximum chunk size (never increase beyond this)
59    pub max_size: usize,
60
61    /// Number of measurements to collect before making adjustment decision
62    pub adjustment_window: usize,
63
64    /// Rolling window of recent occupancy observations
65    pub measurements: VecDeque<Occupancy>,
66
67    /// Timestamp of last chunk size adjustment (for rate limiting)
68    pub last_adjustment_time: Option<Instant>,
69
70    /// Minimum time between adjustments (prevents thrashing/oscillation)
71    min_adjustment_interval: Duration,
72}
73
74impl AdaptiveChunking {
75    /// Create a new adaptive chunking controller with default bounds
76    ///
77    /// **Defaults**:
78    /// - Initial chunk size: 256 items
79    /// - Min size: 16 items
80    /// - Max size: 1024 items
81    /// - Adjustment window: 50 observations
82    /// - Min adjustment interval: 1 second
83    ///
84    /// # Examples
85    ///
86    /// ```rust
87    /// use fraiseql_wire::stream::AdaptiveChunking;
88    /// let adaptive = AdaptiveChunking::new();
89    /// assert_eq!(adaptive.current_size(), 256);
90    /// ```
91    #[must_use]
92    pub fn new() -> Self {
93        Self {
94            current_size: 256,
95            min_size: 16,
96            max_size: 1024,
97            adjustment_window: 50,
98            measurements: VecDeque::with_capacity(50),
99            last_adjustment_time: None,
100            min_adjustment_interval: Duration::from_secs(1),
101        }
102    }
103
104    /// Record an occupancy observation and check if chunk size adjustment is warranted
105    ///
106    /// Call this method after each chunk is sent to the channel.
107    /// Returns `Some(new_size)` if an adjustment should be applied, `None` otherwise.
108    ///
109    /// # Arguments
110    ///
111    /// * `items_buffered` - Number of items currently in the channel
112    /// * `capacity` - Total capacity of the channel (usually equal to `chunk_size`)
113    ///
114    /// # Examples
115    ///
116    /// ```rust
117    /// use fraiseql_wire::stream::AdaptiveChunking;
118    /// let mut adaptive = AdaptiveChunking::new();
119    ///
120    /// // Simulate high occupancy (90%)
121    /// for _ in 0..50 {
122    ///     adaptive.observe(230, 256);  // ~90% occupancy
123    /// }
124    ///
125    /// // On the 51st observation, should trigger adjustment
126    /// if let Some(new_size) = adaptive.observe(230, 256) {
127    ///     println!("Adjusted to {}", new_size);  // Will be < 256
128    /// }
129    /// ```
130    pub fn observe(&mut self, items_buffered: usize, capacity: usize) -> Option<usize> {
131        // Calculate occupancy percentage (clamped at 100% if buffer exceeds capacity)
132        // Special case: if capacity is 0, treat occupancy as 0% (consumer draining instantly)
133        let pct = if capacity == 0 {
134            0
135        } else {
136            (items_buffered * 100)
137                .checked_div(capacity)
138                .unwrap_or(100)
139                .min(100)
140        };
141
142        // Record this observation
143        self.measurements.push_back(Occupancy { percentage: pct });
144
145        // Keep only the most recent measurements in the window
146        while self.measurements.len() > self.adjustment_window {
147            self.measurements.pop_front();
148        }
149
150        // Only consider adjustment if we have a FULL window of observations
151        // (i.e., exactly equal to the window size, not more)
152        // This ensures we only evaluate after collecting N measurements
153        if self.measurements.len() == self.adjustment_window && self.should_adjust() {
154            return self.calculate_adjustment();
155        }
156
157        None
158    }
159
160    /// Get the current chunk size
161    ///
162    /// # Examples
163    ///
164    /// ```rust
165    /// use fraiseql_wire::stream::AdaptiveChunking;
166    /// let adaptive = AdaptiveChunking::new();
167    /// assert_eq!(adaptive.current_size(), 256);
168    /// ```
169    #[must_use]
170    pub const fn current_size(&self) -> usize {
171        self.current_size
172    }
173
174    /// Set custom min/max bounds for chunk size adjustments
175    ///
176    /// Allows overriding the default bounds (16-1024) with custom limits.
177    /// The current chunk size will be clamped to the new bounds.
178    ///
179    /// # Arguments
180    ///
181    /// * `min_size` - Minimum chunk size (must be > 0)
182    /// * `max_size` - Maximum chunk size (must be >= `min_size`)
183    ///
184    /// # Examples
185    ///
186    /// ```rust
187    /// use fraiseql_wire::stream::AdaptiveChunking;
188    /// let mut adaptive = AdaptiveChunking::new();
189    /// adaptive = adaptive.with_bounds(32, 512);  // Custom range 32-512
190    /// assert!(adaptive.current_size() >= 32);
191    /// assert!(adaptive.current_size() <= 512);
192    /// ```
193    pub fn with_bounds(mut self, min_size: usize, max_size: usize) -> Self {
194        // Basic validation
195        if min_size == 0 || max_size < min_size {
196            tracing::warn!(
197                "invalid chunk bounds: min={}, max={}, keeping defaults",
198                min_size,
199                max_size
200            );
201            return self;
202        }
203
204        self.min_size = min_size;
205        self.max_size = max_size;
206
207        // Clamp current size to new bounds
208        if self.current_size < min_size {
209            self.current_size = min_size;
210        } else if self.current_size > max_size {
211            self.current_size = max_size;
212        }
213
214        tracing::debug!(
215            "adaptive chunking bounds set: min={}, max={}, current={}",
216            self.min_size,
217            self.max_size,
218            self.current_size
219        );
220
221        self
222    }
223
224    /// Calculate average occupancy percentage over the measurement window
225    #[must_use]
226    pub fn average_occupancy(&self) -> usize {
227        if self.measurements.is_empty() {
228            return 0;
229        }
230
231        let sum: usize = self.measurements.iter().map(|m| m.percentage).sum();
232        sum / self.measurements.len()
233    }
234
235    /// Check if adjustment conditions are met
236    ///
237    /// Adjustment is only considered if:
238    /// 1. At least 1 second has elapsed since the last adjustment
239    /// 2. Average occupancy is outside the hysteresis band (< 20% or > 80%)
240    fn should_adjust(&self) -> bool {
241        // Rate limit: don't adjust too frequently
242        if let Some(last_adj) = self.last_adjustment_time {
243            if last_adj.elapsed() < self.min_adjustment_interval {
244                return false;
245            }
246        }
247
248        // Hysteresis: only adjust if we're clearly outside the comfort zone
249        let avg = self.average_occupancy();
250        !(20..=80).contains(&avg)
251    }
252
253    /// Calculate the new chunk size based on average occupancy
254    ///
255    /// **Logic**:
256    /// - If avg > 80%: **DECREASE** by factor of 1.5 (high occupancy = producer backed up)
257    /// - If avg < 20%: **INCREASE** by factor of 1.5 (low occupancy = consumer fast)
258    /// - Clamps to [`min_size`, `max_size`]
259    /// - Clears measurements after adjustment
260    ///
261    /// Returns `Some(new_size)` if size actually changed, `None` if no change needed.
262    fn calculate_adjustment(&mut self) -> Option<usize> {
263        let avg = self.average_occupancy();
264        let old_size = self.current_size;
265
266        let new_size = if avg > 80 {
267            // High occupancy: producer is waiting on channel, consumer is slow
268            // → DECREASE chunk_size to reduce backpressure and latency
269            ((self.current_size as f64 / 1.5).floor() as usize).max(self.min_size)
270        } else if avg < 20 {
271            // Low occupancy: consumer is draining fast, producer could batch more
272            // → INCREASE chunk_size to amortize parsing cost and reduce context switches
273            ((self.current_size as f64 * 1.5).ceil() as usize).min(self.max_size)
274        } else {
275            old_size
276        };
277
278        // Only return if there was an actual change
279        if new_size != old_size {
280            self.current_size = new_size;
281            self.last_adjustment_time = Some(Instant::now());
282            self.measurements.clear(); // Reset window for fresh observations
283            Some(new_size)
284        } else {
285            None
286        }
287    }
288}
289
290impl Default for AdaptiveChunking {
291    fn default() -> Self {
292        Self::new()
293    }
294}
295
296#[cfg(test)]
297mod tests;