1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
//! Adaptive batch sizing for Net.
//!
//! This module provides dynamic batch sizing based on queue pressure
//! and latency targets, optimizing throughput for different workload patterns.
use std::sync::atomic::{AtomicU64, Ordering};
use super::protocol::MAX_PAYLOAD_SIZE;
/// Default minimum batch size (1KB)
pub const DEFAULT_MIN_BATCH_SIZE: usize = 1024;
/// Default maximum batch size (8KB - aligned with MAX_PAYLOAD_SIZE)
pub const DEFAULT_MAX_BATCH_SIZE: usize = MAX_PAYLOAD_SIZE;
/// Default target latency in microseconds (100μs)
pub const DEFAULT_TARGET_LATENCY_US: u64 = 100;
/// Queue depth threshold for burst detection
const BURST_THRESHOLD: usize = 50;
/// High queue depth threshold for maximum batching
const HIGH_QUEUE_THRESHOLD: usize = 100;
/// Adaptive batch sizing based on queue pressure and latency.
///
/// This batcher dynamically adjusts batch size to optimize throughput:
///
/// - **Burst mode**: When queue depth > 100, uses maximum batch size
/// - **Latency pressure**: When latency exceeds target, reduces batch size
/// - **Normal mode**: Gradually grows toward maximum batch size
///
/// # Performance
///
/// Adaptive batching can improve throughput by 15-30% for bursty workloads
/// by amortizing per-packet overhead while maintaining latency SLOs.
pub struct AdaptiveBatcher {
/// Minimum batch size in bytes
min_batch_size: usize,
/// Maximum batch size in bytes
max_batch_size: usize,
/// Target latency in microseconds
target_latency_us: u64,
/// Exponential moving average of batch latency (in microseconds * 1000 for precision)
avg_batch_latency_us_x1000: AtomicU64,
/// Current queue depth
queue_depth: AtomicU64,
/// Burst detected flag
burst_detected: std::sync::atomic::AtomicBool,
/// Total batches processed (for metrics)
total_batches: AtomicU64,
/// Total bytes sent (for metrics)
total_bytes: AtomicU64,
}
impl AdaptiveBatcher {
/// Create a new adaptive batcher with default settings.
pub fn new() -> Self {
Self::with_config(
DEFAULT_MIN_BATCH_SIZE,
DEFAULT_MAX_BATCH_SIZE,
DEFAULT_TARGET_LATENCY_US,
)
}
/// Create a new adaptive batcher with custom configuration.
pub fn with_config(
min_batch_size: usize,
max_batch_size: usize,
target_latency_us: u64,
) -> Self {
Self {
min_batch_size,
max_batch_size,
target_latency_us,
// Start at half target, using saturating arithmetic so an
// unusually large `target_latency_us` cannot wrap u64.
avg_batch_latency_us_x1000: AtomicU64::new(target_latency_us.saturating_mul(1000) / 2),
queue_depth: AtomicU64::new(0),
burst_detected: std::sync::atomic::AtomicBool::new(false),
total_batches: AtomicU64::new(0),
total_bytes: AtomicU64::new(0),
}
}
/// Get the optimal batch size based on current conditions.
///
/// This method is called before building a batch to determine
/// how much data to accumulate before sending.
#[inline]
pub fn optimal_size(&self) -> usize {
let queue_depth = self.queue_depth.load(Ordering::Relaxed) as usize;
let burst = self.burst_detected.load(Ordering::Relaxed);
let avg_latency = self.avg_batch_latency_us_x1000.load(Ordering::Relaxed) / 1000;
if burst || queue_depth > HIGH_QUEUE_THRESHOLD {
// Burst mode: maximize batch size for throughput
self.max_batch_size
} else if avg_latency > self.target_latency_us {
// Latency pressure: reduce batch size
// Scale down based on how much we exceed the target
let ratio = self.target_latency_us as f64 / avg_latency as f64;
let scaled = (self.max_batch_size as f64 * ratio) as usize;
scaled.clamp(self.min_batch_size, self.max_batch_size)
} else if queue_depth > BURST_THRESHOLD {
// Medium pressure: use larger batches
(self.min_batch_size + self.max_batch_size * 3) / 4
} else {
// Normal mode: use moderate batch size
(self.min_batch_size + self.max_batch_size) / 2
}
}
/// Record metrics after sending a batch.
///
/// This updates the internal state used by `optimal_size()`.
///
/// # Arguments
///
/// * `batch_size` - Size of the batch in bytes
/// * `latency_us` - Time taken to send the batch in microseconds
/// * `queue_depth` - Current pending queue depth
#[inline]
pub fn record(&self, batch_size: usize, latency_us: u64, queue_depth: usize) {
// Update exponential moving average (alpha = 0.1)
// EMA = alpha * new + (1 - alpha) * old
// Using fixed-point: new_x1000 = (100 * new * 1000 + 900 * old) / 1000
//
// Saturating arithmetic prevents a pathological `latency_us` (e.g.
// from a stuck timer) from wrapping the u64 and corrupting the EMA.
let old = self.avg_batch_latency_us_x1000.load(Ordering::Relaxed);
let new_scaled = latency_us.saturating_mul(100).saturating_mul(1000);
let old_scaled = old.saturating_mul(900);
let new_x1000 = new_scaled.saturating_add(old_scaled) / 1000;
self.avg_batch_latency_us_x1000
.store(new_x1000, Ordering::Relaxed);
// Update queue depth
self.queue_depth
.store(queue_depth as u64, Ordering::Relaxed);
// Detect burst
self.burst_detected
.store(queue_depth > BURST_THRESHOLD, Ordering::Relaxed);
// Update metrics
self.total_batches.fetch_add(1, Ordering::Relaxed);
self.total_bytes
.fetch_add(batch_size as u64, Ordering::Relaxed);
}
/// Get the minimum batch size.
#[inline]
pub fn min_batch_size(&self) -> usize {
self.min_batch_size
}
/// Get the maximum batch size.
#[inline]
pub fn max_batch_size(&self) -> usize {
self.max_batch_size
}
/// Get the target latency in microseconds.
#[inline]
pub fn target_latency_us(&self) -> u64 {
self.target_latency_us
}
/// Get the current average batch latency in microseconds.
#[inline]
pub fn avg_latency_us(&self) -> u64 {
self.avg_batch_latency_us_x1000.load(Ordering::Relaxed) / 1000
}
/// Get the current queue depth.
#[inline]
pub fn current_queue_depth(&self) -> usize {
self.queue_depth.load(Ordering::Relaxed) as usize
}
/// Check if burst mode is active.
#[inline]
pub fn is_burst_mode(&self) -> bool {
self.burst_detected.load(Ordering::Relaxed)
}
/// Get the total number of batches processed.
#[inline]
pub fn total_batches(&self) -> u64 {
self.total_batches.load(Ordering::Relaxed)
}
/// Get the total bytes sent.
#[inline]
pub fn total_bytes(&self) -> u64 {
self.total_bytes.load(Ordering::Relaxed)
}
/// Get the average batch size.
#[inline]
pub fn avg_batch_size(&self) -> usize {
let batches = self.total_batches.load(Ordering::Relaxed);
if batches == 0 {
return 0;
}
(self.total_bytes.load(Ordering::Relaxed) / batches) as usize
}
/// Reset all metrics.
pub fn reset_metrics(&self) {
self.total_batches.store(0, Ordering::Relaxed);
self.total_bytes.store(0, Ordering::Relaxed);
}
}
impl Default for AdaptiveBatcher {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Debug for AdaptiveBatcher {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AdaptiveBatcher")
.field("min_batch_size", &self.min_batch_size)
.field("max_batch_size", &self.max_batch_size)
.field("target_latency_us", &self.target_latency_us)
.field("avg_latency_us", &self.avg_latency_us())
.field("queue_depth", &self.current_queue_depth())
.field("burst_mode", &self.is_burst_mode())
.field("total_batches", &self.total_batches())
.field("total_bytes", &self.total_bytes())
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_adaptive_batcher_default() {
let batcher = AdaptiveBatcher::new();
assert_eq!(batcher.min_batch_size(), DEFAULT_MIN_BATCH_SIZE);
assert_eq!(batcher.max_batch_size(), DEFAULT_MAX_BATCH_SIZE);
assert_eq!(batcher.target_latency_us(), DEFAULT_TARGET_LATENCY_US);
}
#[test]
fn test_adaptive_batcher_custom_config() {
let batcher = AdaptiveBatcher::with_config(512, 4096, 50);
assert_eq!(batcher.min_batch_size(), 512);
assert_eq!(batcher.max_batch_size(), 4096);
assert_eq!(batcher.target_latency_us(), 50);
}
#[test]
fn test_optimal_size_normal() {
let batcher = AdaptiveBatcher::new();
let size = batcher.optimal_size();
// Should be between min and max
assert!(size >= batcher.min_batch_size());
assert!(size <= batcher.max_batch_size());
}
#[test]
fn test_optimal_size_burst_mode() {
let batcher = AdaptiveBatcher::new();
// Simulate burst: high queue depth
batcher.record(1000, 50, 150);
// Should use maximum batch size
let size = batcher.optimal_size();
assert_eq!(size, batcher.max_batch_size());
}
#[test]
fn test_optimal_size_latency_pressure() {
let batcher = AdaptiveBatcher::with_config(1024, 8192, 100);
// Simulate high latency (200μs when target is 100μs)
for _ in 0..10 {
batcher.record(4096, 200, 10);
}
// Should reduce batch size due to latency pressure
let size = batcher.optimal_size();
assert!(size < batcher.max_batch_size());
assert!(size >= batcher.min_batch_size());
}
#[test]
fn test_record_updates_metrics() {
let batcher = AdaptiveBatcher::new();
batcher.record(1000, 50, 20);
batcher.record(2000, 60, 30);
batcher.record(1500, 55, 25);
assert_eq!(batcher.total_batches(), 3);
assert_eq!(batcher.total_bytes(), 4500);
assert_eq!(batcher.avg_batch_size(), 1500);
}
#[test]
fn test_burst_detection() {
let batcher = AdaptiveBatcher::new();
// Low queue depth - no burst
batcher.record(1000, 50, 10);
assert!(!batcher.is_burst_mode());
// High queue depth - burst detected
batcher.record(1000, 50, 100);
assert!(batcher.is_burst_mode());
// Queue drains - burst ends
batcher.record(1000, 50, 10);
assert!(!batcher.is_burst_mode());
}
#[test]
fn test_ema_convergence() {
let batcher = AdaptiveBatcher::with_config(1024, 8192, 100);
// Record consistent latency
for _ in 0..100 {
batcher.record(4096, 80, 10);
}
// EMA should converge close to 80
let avg = batcher.avg_latency_us();
assert!(
(75..=85).contains(&avg),
"EMA should converge to ~80, got {}",
avg
);
}
#[test]
fn test_reset_metrics() {
let batcher = AdaptiveBatcher::new();
batcher.record(1000, 50, 20);
batcher.record(2000, 60, 30);
assert!(batcher.total_batches() > 0);
assert!(batcher.total_bytes() > 0);
batcher.reset_metrics();
assert_eq!(batcher.total_batches(), 0);
assert_eq!(batcher.total_bytes(), 0);
}
#[test]
fn test_debug_format() {
let batcher = AdaptiveBatcher::new();
let debug = format!("{:?}", batcher);
assert!(debug.contains("AdaptiveBatcher"));
assert!(debug.contains("min_batch_size"));
assert!(debug.contains("max_batch_size"));
}
#[test]
fn test_ema_init_does_not_overflow_on_huge_target() {
// Regression: `target_latency_us * 1000 / 2` previously wrapped for
// large `target_latency_us`. Saturating arithmetic must keep the
// seed within u64 and leave the batcher in a sane state.
let batcher = AdaptiveBatcher::with_config(1024, 8192, u64::MAX);
// No panic, and the seed is clamped rather than wrapped to a small value.
assert!(batcher.avg_latency_us() >= u64::MAX / 2000);
}
#[test]
fn test_ema_update_does_not_overflow_on_huge_latency() {
// Regression: `100 * latency_us * 1000 + 900 * old` previously
// wrapped u64 for pathological inputs. Saturating arithmetic must
// clamp to u64::MAX without corrupting the stored EMA.
let batcher = AdaptiveBatcher::with_config(1024, 8192, 100);
batcher.record(4096, u64::MAX, 10);
// No panic in debug; no wrapping in release. EMA should be huge,
// not a tiny wrapped value.
assert!(batcher.avg_latency_us() > 1_000_000);
// optimal_size must still return a value within bounds rather than
// panicking or returning 0 from an infinity cast.
let size = batcher.optimal_size();
assert!(size >= batcher.min_batch_size());
assert!(size <= batcher.max_batch_size());
}
}