fraiseql_wire/stream/adaptive_chunking/mod.rs
1//! Adaptive chunk sizing based on channel occupancy patterns
2//!
3//! This module implements self-tuning chunk sizes that automatically adjust batch sizes
4//! based on observed backpressure (channel occupancy).
5//!
6//! **Critical Semantics**:
7//! `chunk_size` controls **both**:
8//! 1. MPSC channel capacity (backpressure buffer)
9//! 2. Batch size for Postgres row parsing
10//!
11//! **Control Signal Interpretation**:
12//! - **High occupancy** (>80%): Producer waiting on channel capacity, consumer slow
13//! → **Reduce `chunk_size`**: smaller batches reduce pressure, lower latency per item
14//!
15//! - **Low occupancy** (<20%): Consumer faster than producer, frequent context switches
16//! → **Increase `chunk_size`**: larger batches amortize parsing cost, less frequent wakeups
17//!
18//! **Design Principles**:
19//! - Measurement-based adjustment (50-item window) for stability
20//! - Hysteresis band (20%-80%) prevents frequent oscillation
21//! - Minimum adjustment interval (1 second) prevents thrashing
22//! - Conservative bounds (16-1024) prevent pathological extremes
23//! - Clear window reset after adjustment (fresh observations)
24
25use std::collections::VecDeque;
26use std::time::{Duration, Instant};
27
28/// Single observation of channel occupancy
29#[derive(Copy, Clone, Debug)]
30pub struct Occupancy {
31 /// Percentage of channel capacity in use (0-100)
32 percentage: usize,
33}
34
35/// Tracks channel occupancy and automatically adjusts chunk size based on backpressure
36///
37/// # Examples
38///
39/// ```rust
40/// use fraiseql_wire::stream::AdaptiveChunking;
41/// let mut adaptive = AdaptiveChunking::new();
42/// let (buffered_items, channel_capacity) = (50usize, 256usize);
43///
44/// // Periodically observe channel occupancy
45/// for _chunk_sent in 0..100 {
46/// if let Some(new_size) = adaptive.observe(buffered_items, channel_capacity) {
47/// println!("Adjusted chunk size to {}", new_size);
48/// }
49/// }
50/// ```
51pub struct AdaptiveChunking {
52 /// Current chunk size (mutable, adjusted over time)
53 current_size: usize,
54
55 /// Absolute minimum chunk size (never decrease below this)
56 pub min_size: usize,
57
58 /// Absolute maximum chunk size (never increase beyond this)
59 pub max_size: usize,
60
61 /// Number of measurements to collect before making adjustment decision
62 pub adjustment_window: usize,
63
64 /// Rolling window of recent occupancy observations
65 pub measurements: VecDeque<Occupancy>,
66
67 /// Timestamp of last chunk size adjustment (for rate limiting)
68 pub last_adjustment_time: Option<Instant>,
69
70 /// Minimum time between adjustments (prevents thrashing/oscillation)
71 min_adjustment_interval: Duration,
72}
73
74impl AdaptiveChunking {
75 /// Create a new adaptive chunking controller with default bounds
76 ///
77 /// **Defaults**:
78 /// - Initial chunk size: 256 items
79 /// - Min size: 16 items
80 /// - Max size: 1024 items
81 /// - Adjustment window: 50 observations
82 /// - Min adjustment interval: 1 second
83 ///
84 /// # Examples
85 ///
86 /// ```rust
87 /// use fraiseql_wire::stream::AdaptiveChunking;
88 /// let adaptive = AdaptiveChunking::new();
89 /// assert_eq!(adaptive.current_size(), 256);
90 /// ```
91 #[must_use]
92 pub fn new() -> Self {
93 Self {
94 current_size: 256,
95 min_size: 16,
96 max_size: 1024,
97 adjustment_window: 50,
98 measurements: VecDeque::with_capacity(50),
99 last_adjustment_time: None,
100 min_adjustment_interval: Duration::from_secs(1),
101 }
102 }
103
104 /// Record an occupancy observation and check if chunk size adjustment is warranted
105 ///
106 /// Call this method after each chunk is sent to the channel.
107 /// Returns `Some(new_size)` if an adjustment should be applied, `None` otherwise.
108 ///
109 /// # Arguments
110 ///
111 /// * `items_buffered` - Number of items currently in the channel
112 /// * `capacity` - Total capacity of the channel (usually equal to `chunk_size`)
113 ///
114 /// # Examples
115 ///
116 /// ```rust
117 /// use fraiseql_wire::stream::AdaptiveChunking;
118 /// let mut adaptive = AdaptiveChunking::new();
119 ///
120 /// // Simulate high occupancy (90%)
121 /// for _ in 0..50 {
122 /// adaptive.observe(230, 256); // ~90% occupancy
123 /// }
124 ///
125 /// // On the 51st observation, should trigger adjustment
126 /// if let Some(new_size) = adaptive.observe(230, 256) {
127 /// println!("Adjusted to {}", new_size); // Will be < 256
128 /// }
129 /// ```
130 pub fn observe(&mut self, items_buffered: usize, capacity: usize) -> Option<usize> {
131 // Calculate occupancy percentage (clamped at 100% if buffer exceeds capacity)
132 // Special case: if capacity is 0, treat occupancy as 0% (consumer draining instantly)
133 let pct = if capacity == 0 {
134 0
135 } else {
136 (items_buffered * 100)
137 .checked_div(capacity)
138 .unwrap_or(100)
139 .min(100)
140 };
141
142 // Record this observation
143 self.measurements.push_back(Occupancy { percentage: pct });
144
145 // Keep only the most recent measurements in the window
146 while self.measurements.len() > self.adjustment_window {
147 self.measurements.pop_front();
148 }
149
150 // Only consider adjustment if we have a FULL window of observations
151 // (i.e., exactly equal to the window size, not more)
152 // This ensures we only evaluate after collecting N measurements
153 if self.measurements.len() == self.adjustment_window && self.should_adjust() {
154 return self.calculate_adjustment();
155 }
156
157 None
158 }
159
160 /// Get the current chunk size
161 ///
162 /// # Examples
163 ///
164 /// ```rust
165 /// use fraiseql_wire::stream::AdaptiveChunking;
166 /// let adaptive = AdaptiveChunking::new();
167 /// assert_eq!(adaptive.current_size(), 256);
168 /// ```
169 #[must_use]
170 pub const fn current_size(&self) -> usize {
171 self.current_size
172 }
173
174 /// Set custom min/max bounds for chunk size adjustments
175 ///
176 /// Allows overriding the default bounds (16-1024) with custom limits.
177 /// The current chunk size will be clamped to the new bounds.
178 ///
179 /// # Arguments
180 ///
181 /// * `min_size` - Minimum chunk size (must be > 0)
182 /// * `max_size` - Maximum chunk size (must be >= `min_size`)
183 ///
184 /// # Examples
185 ///
186 /// ```rust
187 /// use fraiseql_wire::stream::AdaptiveChunking;
188 /// let mut adaptive = AdaptiveChunking::new();
189 /// adaptive = adaptive.with_bounds(32, 512); // Custom range 32-512
190 /// assert!(adaptive.current_size() >= 32);
191 /// assert!(adaptive.current_size() <= 512);
192 /// ```
193 pub fn with_bounds(mut self, min_size: usize, max_size: usize) -> Self {
194 // Basic validation
195 if min_size == 0 || max_size < min_size {
196 tracing::warn!(
197 "invalid chunk bounds: min={}, max={}, keeping defaults",
198 min_size,
199 max_size
200 );
201 return self;
202 }
203
204 self.min_size = min_size;
205 self.max_size = max_size;
206
207 // Clamp current size to new bounds
208 if self.current_size < min_size {
209 self.current_size = min_size;
210 } else if self.current_size > max_size {
211 self.current_size = max_size;
212 }
213
214 tracing::debug!(
215 "adaptive chunking bounds set: min={}, max={}, current={}",
216 self.min_size,
217 self.max_size,
218 self.current_size
219 );
220
221 self
222 }
223
224 /// Calculate average occupancy percentage over the measurement window
225 #[must_use]
226 pub fn average_occupancy(&self) -> usize {
227 if self.measurements.is_empty() {
228 return 0;
229 }
230
231 let sum: usize = self.measurements.iter().map(|m| m.percentage).sum();
232 sum / self.measurements.len()
233 }
234
235 /// Check if adjustment conditions are met
236 ///
237 /// Adjustment is only considered if:
238 /// 1. At least 1 second has elapsed since the last adjustment
239 /// 2. Average occupancy is outside the hysteresis band (< 20% or > 80%)
240 fn should_adjust(&self) -> bool {
241 // Rate limit: don't adjust too frequently
242 if let Some(last_adj) = self.last_adjustment_time {
243 if last_adj.elapsed() < self.min_adjustment_interval {
244 return false;
245 }
246 }
247
248 // Hysteresis: only adjust if we're clearly outside the comfort zone
249 let avg = self.average_occupancy();
250 !(20..=80).contains(&avg)
251 }
252
253 /// Calculate the new chunk size based on average occupancy
254 ///
255 /// **Logic**:
256 /// - If avg > 80%: **DECREASE** by factor of 1.5 (high occupancy = producer backed up)
257 /// - If avg < 20%: **INCREASE** by factor of 1.5 (low occupancy = consumer fast)
258 /// - Clamps to [`min_size`, `max_size`]
259 /// - Clears measurements after adjustment
260 ///
261 /// Returns `Some(new_size)` if size actually changed, `None` if no change needed.
262 fn calculate_adjustment(&mut self) -> Option<usize> {
263 let avg = self.average_occupancy();
264 let old_size = self.current_size;
265
266 let new_size = if avg > 80 {
267 // High occupancy: producer is waiting on channel, consumer is slow
268 // → DECREASE chunk_size to reduce backpressure and latency
269 ((self.current_size as f64 / 1.5).floor() as usize).max(self.min_size)
270 } else if avg < 20 {
271 // Low occupancy: consumer is draining fast, producer could batch more
272 // → INCREASE chunk_size to amortize parsing cost and reduce context switches
273 ((self.current_size as f64 * 1.5).ceil() as usize).min(self.max_size)
274 } else {
275 old_size
276 };
277
278 // Only return if there was an actual change
279 if new_size != old_size {
280 self.current_size = new_size;
281 self.last_adjustment_time = Some(Instant::now());
282 self.measurements.clear(); // Reset window for fresh observations
283 Some(new_size)
284 } else {
285 None
286 }
287 }
288}
289
290impl Default for AdaptiveChunking {
291 fn default() -> Self {
292 Self::new()
293 }
294}
295
296#[cfg(test)]
297mod tests;