fgumi_lib/reorder_buffer.rs
1//! Reordering buffer for out-of-order batch completion.
2//!
3//! This module provides a buffer that accepts items tagged with sequence numbers
4//! and releases them in sequential order. It's used by the ordered work-stealing
5//! pipeline to maintain output order when batches complete out of order.
6//!
7//! # Example
8//!
9//! ```
10//! use fgumi_lib::reorder_buffer::ReorderBuffer;
11//!
12//! let mut buffer: ReorderBuffer<String> = ReorderBuffer::new();
13//!
14//! // Insert items out of order
15//! buffer.insert(2, "third".to_string());
16//! buffer.insert(0, "first".to_string());
17//! buffer.insert(1, "second".to_string());
18//!
19//! // Pop in sequence order
20//! assert_eq!(buffer.try_pop_next(), Some("first".to_string()));
21//! assert_eq!(buffer.try_pop_next(), Some("second".to_string()));
22//! assert_eq!(buffer.try_pop_next(), Some("third".to_string()));
23//! assert_eq!(buffer.try_pop_next(), None);
24//! ```
25
26use std::collections::VecDeque;
27
28use crate::unified_pipeline::MemoryEstimate;
29
30/// A buffer that releases items in sequential order.
31///
32/// Items can be inserted with any sequence number, but they are only
33/// released when all prior sequence numbers have been released.
34/// This enables ordered output from parallel processing where
35/// items may complete out of order.
36///
37/// Uses a sparse `VecDeque` for O(1) insert and pop operations.
38///
39/// # Memory Tracking
40///
41/// The buffer optionally tracks heap memory usage when using the
42/// `insert_with_size`/`try_pop_next_with_size` methods. This enables
43/// memory-bounded backpressure without expensive per-item size calculations.
44#[derive(Debug)]
45pub struct ReorderBuffer<T> {
46 /// Sparse buffer: index (seq - `next_seq`) maps to `Option<(T, usize)>` where
47 /// usize is the pre-computed heap size (0 if not tracked).
48 buffer: VecDeque<Option<(T, usize)>>,
49 /// Next sequence number to release (also the sequence number corresponding to buffer[0]).
50 next_seq: u64,
51 /// Number of items currently stored.
52 count: usize,
53 /// Total tracked heap memory in bytes.
54 heap_bytes: u64,
55 /// Whether the next sequential item is ready to pop (optimization to avoid
56 /// repeated checks). Updated on insert/pop.
57 can_pop: bool,
58}
59
60impl<T> ReorderBuffer<T> {
61 /// Create a new reorder buffer.
62 #[must_use]
63 pub fn new() -> Self {
64 Self { buffer: VecDeque::new(), next_seq: 0, count: 0, heap_bytes: 0, can_pop: false }
65 }
66
67 /// Insert an item with a sequence number (without memory tracking).
68 ///
69 /// Items can be inserted in any order. They will be released
70 /// in sequence order via `try_pop_next()` or `drain_ready()`.
71 ///
72 /// # Arguments
73 ///
74 /// * `seq` - The sequence number for this item (0-indexed, monotonically increasing).
75 /// * `item` - The item to buffer.
76 ///
77 /// # Panics
78 ///
79 /// Panics in debug mode if an item with the same sequence number is already buffered,
80 /// or if the sequence number is before the current base.
81 pub fn insert(&mut self, seq: u64, item: T) {
82 self.insert_with_size(seq, item, 0);
83 }
84
85 /// Insert an item with a sequence number and pre-computed heap size.
86 ///
87 /// This variant tracks memory usage for backpressure control.
88 /// The size should be computed once when creating the item.
89 ///
90 /// # Arguments
91 ///
92 /// * `seq` - The sequence number for this item (0-indexed, monotonically increasing).
93 /// * `item` - The item to buffer.
94 /// * `heap_size` - Pre-computed heap size of the item in bytes.
95 ///
96 /// # Panics
97 ///
98 /// Panics in debug mode if an item with the same sequence number is already buffered,
99 /// or if the sequence number is before the current base.
100 #[allow(clippy::cast_possible_truncation)]
101 pub fn insert_with_size(&mut self, seq: u64, item: T, heap_size: usize) {
102 debug_assert!(
103 seq >= self.next_seq,
104 "Sequence number {seq} is before base {}",
105 self.next_seq
106 );
107
108 let index = (seq - self.next_seq) as usize;
109
110 // Extend buffer with None entries if needed
111 while self.buffer.len() <= index {
112 self.buffer.push_back(None);
113 }
114
115 debug_assert!(self.buffer[index].is_none(), "Duplicate sequence number: {seq}");
116 self.buffer[index] = Some((item, heap_size));
117 self.count += 1;
118 self.heap_bytes += heap_size as u64;
119
120 // Update can_pop: if we inserted at index 0, we can now pop
121 if index == 0 {
122 self.can_pop = true;
123 }
124 }
125
126 /// Pop the next sequential item if available (without returning size).
127 ///
128 /// Returns `Some(item)` if the item with `next_seq` is buffered,
129 /// otherwise returns `None`.
130 ///
131 /// This advances the internal sequence counter, so subsequent calls
132 /// will return the next item in sequence.
133 #[must_use]
134 pub fn try_pop_next(&mut self) -> Option<T> {
135 self.try_pop_next_with_size().map(|(item, _size)| item)
136 }
137
138 /// Pop the next sequential item if available, returning the item and its tracked size.
139 ///
140 /// Returns `Some((item, heap_size))` if the item with `next_seq` is buffered,
141 /// otherwise returns `None`.
142 ///
143 /// # Panics
144 ///
145 /// Panics if internal state is inconsistent (the front item was indicated as poppable but is missing).
146 #[must_use]
147 pub fn try_pop_next_with_size(&mut self) -> Option<(T, usize)> {
148 // Check if front item is present (handles empty buffer and gaps)
149 if !self.can_pop {
150 return None;
151 }
152
153 // Pop the front item
154 let (item, size) = self.buffer.pop_front().unwrap().unwrap();
155 self.next_seq += 1;
156 self.count -= 1;
157 self.heap_bytes = self.heap_bytes.saturating_sub(size as u64);
158
159 // Update can_pop for next item
160 self.can_pop = self.buffer.front().is_some_and(Option::is_some);
161
162 Some((item, size))
163 }
164
165 /// Drain all consecutive ready items starting from the current sequence.
166 ///
167 /// Returns an iterator that yields items in sequence order, stopping
168 /// when it reaches a gap in the sequence.
169 ///
170 /// # Example
171 ///
172 /// ```
173 /// use fgumi_lib::reorder_buffer::ReorderBuffer;
174 ///
175 /// let mut buffer: ReorderBuffer<i32> = ReorderBuffer::new();
176 /// buffer.insert(0, 10);
177 /// buffer.insert(1, 20);
178 /// buffer.insert(3, 40); // Gap at 2
179 ///
180 /// let ready: Vec<_> = buffer.drain_ready().collect();
181 /// assert_eq!(ready, vec![10, 20]); // Stops at gap
182 /// ```
183 pub fn drain_ready(&mut self) -> DrainReady<'_, T> {
184 DrainReady { buffer: self }
185 }
186
187 /// Check if the buffer is empty.
188 #[must_use]
189 pub fn is_empty(&self) -> bool {
190 self.count == 0
191 }
192
193 /// Get the number of items currently stored in the buffer.
194 #[must_use]
195 pub fn len(&self) -> usize {
196 self.count
197 }
198
199 /// Get the internal buffer length (number of slots including gaps).
200 ///
201 /// This reflects the actual memory footprint of the `VecDeque`, which grows
202 /// when items arrive out of order (gaps are filled with `None`). Use this
203 /// to limit memory growth when items may arrive with large serial gaps.
204 #[must_use]
205 pub fn buffer_len(&self) -> usize {
206 self.buffer.len()
207 }
208
209 /// Get the next expected sequence number.
210 #[must_use]
211 pub fn next_seq(&self) -> u64 {
212 self.next_seq
213 }
214
215 /// Check if the next sequential item is ready to pop.
216 ///
217 /// This is O(1) and can be called frequently for backpressure decisions.
218 #[must_use]
219 pub fn can_pop(&self) -> bool {
220 self.can_pop
221 }
222
223 /// Get the tracked heap memory in bytes.
224 ///
225 /// This returns the sum of sizes passed to `insert_with_size`.
226 /// It's O(1) and suitable for frequent backpressure checks.
227 #[must_use]
228 pub fn heap_bytes(&self) -> u64 {
229 self.heap_bytes
230 }
231
232 /// Check if this reorder buffer would accept a new item with the given serial.
233 ///
234 /// This implements deadlock-free admission control:
235 /// - Always accept if serial == `next_seq` (the item we're waiting for)
236 /// - Always accept if we can't pop (we're stuck, need more items)
237 /// - Reject if over the memory limit AND we can make progress
238 ///
239 /// # Arguments
240 ///
241 /// * `serial` - The sequence number of the item to potentially insert
242 /// * `memory_limit` - The memory limit in bytes (0 = unlimited)
243 ///
244 /// # Returns
245 ///
246 /// `true` if the item should be accepted, `false` if backpressure should be applied.
247 #[must_use]
248 pub fn would_accept(&self, serial: u64, memory_limit: u64) -> bool {
249 // No limit = always accept
250 if memory_limit == 0 {
251 return true;
252 }
253
254 // Always accept the item we need next (avoids deadlock)
255 if serial == self.next_seq {
256 return true;
257 }
258
259 // If we can't make progress, accept everything (avoids deadlock)
260 if !self.can_pop {
261 return true;
262 }
263
264 // We can make progress and this isn't the needed item.
265 // Apply backpressure if over limit.
266 self.heap_bytes < memory_limit
267 }
268
269 /// Calculate total heap memory used by all items in the buffer.
270 ///
271 /// This iterates all items and sums their heap sizes. Only use for
272 /// profiling/debugging as it's O(n) in the number of buffered items.
273 ///
274 /// Note: This calculates fresh from items using `MemoryEstimate`, not from
275 /// tracked sizes. Use `heap_bytes()` for the tracked value.
276 #[must_use]
277 pub fn total_heap_size(&self) -> usize
278 where
279 T: MemoryEstimate,
280 {
281 self.buffer
282 .iter()
283 .filter_map(|opt| opt.as_ref())
284 .map(|(item, _size)| item.estimate_heap_size())
285 .sum()
286 }
287
288 /// Set the next expected sequence number (for testing or reset scenarios).
289 #[cfg(test)]
290 pub fn set_next_seq(&mut self, seq: u64) {
291 self.next_seq = seq;
292 }
293}
294
295impl<T> Default for ReorderBuffer<T> {
296 fn default() -> Self {
297 Self::new()
298 }
299}
300
301/// Iterator that drains consecutive ready items from a `ReorderBuffer`.
302pub struct DrainReady<'a, T> {
303 buffer: &'a mut ReorderBuffer<T>,
304}
305
306impl<T> Iterator for DrainReady<'_, T> {
307 type Item = T;
308
309 fn next(&mut self) -> Option<Self::Item> {
310 self.buffer.try_pop_next()
311 }
312}
313
314#[cfg(test)]
315mod tests {
316 use super::*;
317
318 #[test]
319 fn test_in_order_insertion() {
320 let mut buffer: ReorderBuffer<i32> = ReorderBuffer::new();
321
322 buffer.insert(0, 100);
323 buffer.insert(1, 200);
324 buffer.insert(2, 300);
325
326 assert_eq!(buffer.try_pop_next(), Some(100));
327 assert_eq!(buffer.try_pop_next(), Some(200));
328 assert_eq!(buffer.try_pop_next(), Some(300));
329 assert_eq!(buffer.try_pop_next(), None);
330 }
331
332 #[test]
333 fn test_out_of_order_insertion() {
334 let mut buffer: ReorderBuffer<i32> = ReorderBuffer::new();
335
336 // Insert in reverse order
337 buffer.insert(2, 300);
338 buffer.insert(1, 200);
339 buffer.insert(0, 100);
340
341 // Should still pop in sequence order
342 assert_eq!(buffer.try_pop_next(), Some(100));
343 assert_eq!(buffer.try_pop_next(), Some(200));
344 assert_eq!(buffer.try_pop_next(), Some(300));
345 assert_eq!(buffer.try_pop_next(), None);
346 }
347
348 #[test]
349 fn test_gap_blocks_progress() {
350 let mut buffer: ReorderBuffer<i32> = ReorderBuffer::new();
351
352 buffer.insert(0, 100);
353 buffer.insert(2, 300); // Gap at 1
354
355 assert_eq!(buffer.try_pop_next(), Some(100));
356 assert_eq!(buffer.try_pop_next(), None); // Blocked on missing 1
357
358 // Fill the gap
359 buffer.insert(1, 200);
360
361 assert_eq!(buffer.try_pop_next(), Some(200));
362 assert_eq!(buffer.try_pop_next(), Some(300));
363 assert_eq!(buffer.try_pop_next(), None);
364 }
365
366 #[test]
367 fn test_drain_ready() {
368 let mut buffer: ReorderBuffer<i32> = ReorderBuffer::new();
369
370 buffer.insert(0, 100);
371 buffer.insert(1, 200);
372 buffer.insert(3, 400); // Gap at 2
373
374 let ready: Vec<_> = buffer.drain_ready().collect();
375 assert_eq!(ready, vec![100, 200]);
376
377 // Buffer should have 3 still pending
378 assert!(!buffer.is_empty());
379 assert_eq!(buffer.next_seq(), 2);
380
381 // Fill gap and drain again
382 buffer.insert(2, 300);
383 let more: Vec<_> = buffer.drain_ready().collect();
384 assert_eq!(more, vec![300, 400]);
385 assert!(buffer.is_empty());
386 }
387
388 #[test]
389 fn test_sparse_insertion() {
390 let mut buffer: ReorderBuffer<i32> = ReorderBuffer::new();
391
392 // Insert with gaps
393 buffer.insert(0, 100);
394 buffer.insert(5, 600);
395 buffer.insert(2, 300);
396
397 assert_eq!(buffer.try_pop_next(), Some(100));
398 assert_eq!(buffer.try_pop_next(), None); // Gap at 1
399
400 buffer.insert(1, 200);
401 assert_eq!(buffer.try_pop_next(), Some(200));
402 assert_eq!(buffer.try_pop_next(), Some(300));
403 assert_eq!(buffer.try_pop_next(), None); // Gap at 3, 4
404
405 buffer.insert(3, 400);
406 buffer.insert(4, 500);
407
408 let rest: Vec<_> = buffer.drain_ready().collect();
409 assert_eq!(rest, vec![400, 500, 600]);
410 }
411
412 #[test]
413 fn test_large_sequence_numbers() {
414 let mut buffer: ReorderBuffer<i32> = ReorderBuffer::new();
415
416 // Start from a large sequence number
417 let start = 1_000_000u64;
418 buffer.set_next_seq(start);
419
420 buffer.insert(start, 100);
421 buffer.insert(start + 1, 200);
422
423 assert_eq!(buffer.try_pop_next(), Some(100));
424 assert_eq!(buffer.try_pop_next(), Some(200));
425 }
426
427 #[test]
428 fn test_memory_tracking() {
429 let mut buffer: ReorderBuffer<i32> = ReorderBuffer::new();
430
431 // Insert with sizes
432 buffer.insert_with_size(0, 100, 1000);
433 buffer.insert_with_size(1, 200, 2000);
434 buffer.insert_with_size(2, 300, 3000);
435
436 assert_eq!(buffer.heap_bytes(), 6000);
437 assert_eq!(buffer.len(), 3);
438
439 // Pop and verify memory decreases
440 let (val, size) = buffer.try_pop_next_with_size().unwrap();
441 assert_eq!(val, 100);
442 assert_eq!(size, 1000);
443 assert_eq!(buffer.heap_bytes(), 5000);
444
445 let (val, size) = buffer.try_pop_next_with_size().unwrap();
446 assert_eq!(val, 200);
447 assert_eq!(size, 2000);
448 assert_eq!(buffer.heap_bytes(), 3000);
449 }
450
451 #[test]
452 fn test_can_pop_tracking() {
453 let mut buffer: ReorderBuffer<i32> = ReorderBuffer::new();
454
455 // Empty buffer - can't pop
456 assert!(!buffer.can_pop());
457
458 // Insert at seq 1 (gap at 0) - still can't pop
459 buffer.insert(1, 200);
460 assert!(!buffer.can_pop());
461
462 // Insert at seq 0 - now can pop
463 buffer.insert(0, 100);
464 assert!(buffer.can_pop());
465
466 // Pop - should still be able to pop (seq 1 is ready)
467 assert_eq!(buffer.try_pop_next(), Some(100));
468 assert!(buffer.can_pop());
469
470 // Pop again - now can't pop (empty)
471 assert_eq!(buffer.try_pop_next(), Some(200));
472 assert!(!buffer.can_pop());
473 }
474
475 #[test]
476 fn test_would_accept_no_limit() {
477 let mut buffer: ReorderBuffer<i32> = ReorderBuffer::new();
478 buffer.insert_with_size(0, 100, 1_000_000_000); // 1GB
479
480 // With no limit (0), always accept
481 assert!(buffer.would_accept(1, 0));
482 assert!(buffer.would_accept(999, 0));
483 }
484
485 #[test]
486 fn test_would_accept_needed_serial() {
487 let mut buffer: ReorderBuffer<i32> = ReorderBuffer::new();
488 buffer.insert_with_size(1, 200, 1_000_000_000); // 1GB, creates gap at 0
489
490 // We need serial 0, so always accept it even if over limit
491 assert!(buffer.would_accept(0, 100)); // limit is 100 bytes, we have 1GB
492 }
493
494 #[test]
495 fn test_would_accept_stuck() {
496 let mut buffer: ReorderBuffer<i32> = ReorderBuffer::new();
497 buffer.insert_with_size(2, 300, 1_000_000_000); // Gap at 0, 1
498
499 // Buffer is stuck (can't pop), so accept everything
500 assert!(!buffer.can_pop());
501 assert!(buffer.would_accept(3, 100)); // Not the needed serial, but we're stuck
502 assert!(buffer.would_accept(10, 100)); // Still stuck
503 }
504
505 #[test]
506 fn test_would_accept_over_limit() {
507 let mut buffer: ReorderBuffer<i32> = ReorderBuffer::new();
508 buffer.insert_with_size(0, 100, 1000);
509
510 // Can pop and over limit - reject non-needed serial
511 assert!(buffer.can_pop());
512 assert_eq!(buffer.heap_bytes(), 1000);
513
514 // Serial 1 is not the needed serial (0 is), and we're at 1000 bytes
515 // With limit 500, we're over limit, should reject
516 assert!(!buffer.would_accept(1, 500));
517
518 // But serial 0 (the next_seq) should always be accepted
519 // Actually next_seq is 0, and we have item at 0, so next needed is after pop
520 // Let me think: next_seq=0, item at 0 exists, so would_accept(0, ...) checks serial==next_seq
521 // which is true, so it returns true. But we already have 0! That's fine, the check
522 // is just for admission control before insertion.
523
524 // With limit 2000, we're under limit, should accept
525 assert!(buffer.would_accept(1, 2000));
526 }
527}