Skip to main content

fgumi_lib/
reorder_buffer.rs

1//! Reordering buffer for out-of-order batch completion.
2//!
3//! This module provides a buffer that accepts items tagged with sequence numbers
4//! and releases them in sequential order. It's used by the ordered work-stealing
5//! pipeline to maintain output order when batches complete out of order.
6//!
7//! # Example
8//!
9//! ```
10//! use fgumi_lib::reorder_buffer::ReorderBuffer;
11//!
12//! let mut buffer: ReorderBuffer<String> = ReorderBuffer::new();
13//!
14//! // Insert items out of order
15//! buffer.insert(2, "third".to_string());
16//! buffer.insert(0, "first".to_string());
17//! buffer.insert(1, "second".to_string());
18//!
19//! // Pop in sequence order
20//! assert_eq!(buffer.try_pop_next(), Some("first".to_string()));
21//! assert_eq!(buffer.try_pop_next(), Some("second".to_string()));
22//! assert_eq!(buffer.try_pop_next(), Some("third".to_string()));
23//! assert_eq!(buffer.try_pop_next(), None);
24//! ```
25
26use std::collections::VecDeque;
27
28use crate::unified_pipeline::MemoryEstimate;
29
30/// A buffer that releases items in sequential order.
31///
32/// Items can be inserted with any sequence number, but they are only
33/// released when all prior sequence numbers have been released.
34/// This enables ordered output from parallel processing where
35/// items may complete out of order.
36///
37/// Uses a sparse `VecDeque` for O(1) insert and pop operations.
38///
39/// # Memory Tracking
40///
41/// The buffer optionally tracks heap memory usage when using the
42/// `insert_with_size`/`try_pop_next_with_size` methods. This enables
43/// memory-bounded backpressure without expensive per-item size calculations.
44#[derive(Debug)]
45pub struct ReorderBuffer<T> {
46    /// Sparse buffer: index (seq - `next_seq`) maps to `Option<(T, usize)>` where
47    /// usize is the pre-computed heap size (0 if not tracked).
48    buffer: VecDeque<Option<(T, usize)>>,
49    /// Next sequence number to release (also the sequence number corresponding to buffer[0]).
50    next_seq: u64,
51    /// Number of items currently stored.
52    count: usize,
53    /// Total tracked heap memory in bytes.
54    heap_bytes: u64,
55    /// Whether the next sequential item is ready to pop (optimization to avoid
56    /// repeated checks). Updated on insert/pop.
57    can_pop: bool,
58}
59
60impl<T> ReorderBuffer<T> {
61    /// Create a new reorder buffer.
62    #[must_use]
63    pub fn new() -> Self {
64        Self { buffer: VecDeque::new(), next_seq: 0, count: 0, heap_bytes: 0, can_pop: false }
65    }
66
67    /// Insert an item with a sequence number (without memory tracking).
68    ///
69    /// Items can be inserted in any order. They will be released
70    /// in sequence order via `try_pop_next()` or `drain_ready()`.
71    ///
72    /// # Arguments
73    ///
74    /// * `seq` - The sequence number for this item (0-indexed, monotonically increasing).
75    /// * `item` - The item to buffer.
76    ///
77    /// # Panics
78    ///
79    /// Panics in debug mode if an item with the same sequence number is already buffered,
80    /// or if the sequence number is before the current base.
81    pub fn insert(&mut self, seq: u64, item: T) {
82        self.insert_with_size(seq, item, 0);
83    }
84
85    /// Insert an item with a sequence number and pre-computed heap size.
86    ///
87    /// This variant tracks memory usage for backpressure control.
88    /// The size should be computed once when creating the item.
89    ///
90    /// # Arguments
91    ///
92    /// * `seq` - The sequence number for this item (0-indexed, monotonically increasing).
93    /// * `item` - The item to buffer.
94    /// * `heap_size` - Pre-computed heap size of the item in bytes.
95    ///
96    /// # Panics
97    ///
98    /// Panics in debug mode if an item with the same sequence number is already buffered,
99    /// or if the sequence number is before the current base.
100    #[allow(clippy::cast_possible_truncation)]
101    pub fn insert_with_size(&mut self, seq: u64, item: T, heap_size: usize) {
102        debug_assert!(
103            seq >= self.next_seq,
104            "Sequence number {seq} is before base {}",
105            self.next_seq
106        );
107
108        let index = (seq - self.next_seq) as usize;
109
110        // Extend buffer with None entries if needed
111        while self.buffer.len() <= index {
112            self.buffer.push_back(None);
113        }
114
115        debug_assert!(self.buffer[index].is_none(), "Duplicate sequence number: {seq}");
116        self.buffer[index] = Some((item, heap_size));
117        self.count += 1;
118        self.heap_bytes += heap_size as u64;
119
120        // Update can_pop: if we inserted at index 0, we can now pop
121        if index == 0 {
122            self.can_pop = true;
123        }
124    }
125
126    /// Pop the next sequential item if available (without returning size).
127    ///
128    /// Returns `Some(item)` if the item with `next_seq` is buffered,
129    /// otherwise returns `None`.
130    ///
131    /// This advances the internal sequence counter, so subsequent calls
132    /// will return the next item in sequence.
133    #[must_use]
134    pub fn try_pop_next(&mut self) -> Option<T> {
135        self.try_pop_next_with_size().map(|(item, _size)| item)
136    }
137
138    /// Pop the next sequential item if available, returning the item and its tracked size.
139    ///
140    /// Returns `Some((item, heap_size))` if the item with `next_seq` is buffered,
141    /// otherwise returns `None`.
142    ///
143    /// # Panics
144    ///
145    /// Panics if internal state is inconsistent (the front item was indicated as poppable but is missing).
146    #[must_use]
147    pub fn try_pop_next_with_size(&mut self) -> Option<(T, usize)> {
148        // Check if front item is present (handles empty buffer and gaps)
149        if !self.can_pop {
150            return None;
151        }
152
153        // Pop the front item
154        let (item, size) = self.buffer.pop_front().unwrap().unwrap();
155        self.next_seq += 1;
156        self.count -= 1;
157        self.heap_bytes = self.heap_bytes.saturating_sub(size as u64);
158
159        // Update can_pop for next item
160        self.can_pop = self.buffer.front().is_some_and(Option::is_some);
161
162        Some((item, size))
163    }
164
165    /// Drain all consecutive ready items starting from the current sequence.
166    ///
167    /// Returns an iterator that yields items in sequence order, stopping
168    /// when it reaches a gap in the sequence.
169    ///
170    /// # Example
171    ///
172    /// ```
173    /// use fgumi_lib::reorder_buffer::ReorderBuffer;
174    ///
175    /// let mut buffer: ReorderBuffer<i32> = ReorderBuffer::new();
176    /// buffer.insert(0, 10);
177    /// buffer.insert(1, 20);
178    /// buffer.insert(3, 40);  // Gap at 2
179    ///
180    /// let ready: Vec<_> = buffer.drain_ready().collect();
181    /// assert_eq!(ready, vec![10, 20]);  // Stops at gap
182    /// ```
183    pub fn drain_ready(&mut self) -> DrainReady<'_, T> {
184        DrainReady { buffer: self }
185    }
186
187    /// Check if the buffer is empty.
188    #[must_use]
189    pub fn is_empty(&self) -> bool {
190        self.count == 0
191    }
192
193    /// Get the number of items currently stored in the buffer.
194    #[must_use]
195    pub fn len(&self) -> usize {
196        self.count
197    }
198
199    /// Get the internal buffer length (number of slots including gaps).
200    ///
201    /// This reflects the actual memory footprint of the `VecDeque`, which grows
202    /// when items arrive out of order (gaps are filled with `None`). Use this
203    /// to limit memory growth when items may arrive with large serial gaps.
204    #[must_use]
205    pub fn buffer_len(&self) -> usize {
206        self.buffer.len()
207    }
208
209    /// Get the next expected sequence number.
210    #[must_use]
211    pub fn next_seq(&self) -> u64 {
212        self.next_seq
213    }
214
215    /// Check if the next sequential item is ready to pop.
216    ///
217    /// This is O(1) and can be called frequently for backpressure decisions.
218    #[must_use]
219    pub fn can_pop(&self) -> bool {
220        self.can_pop
221    }
222
223    /// Get the tracked heap memory in bytes.
224    ///
225    /// This returns the sum of sizes passed to `insert_with_size`.
226    /// It's O(1) and suitable for frequent backpressure checks.
227    #[must_use]
228    pub fn heap_bytes(&self) -> u64 {
229        self.heap_bytes
230    }
231
232    /// Check if this reorder buffer would accept a new item with the given serial.
233    ///
234    /// This implements deadlock-free admission control:
235    /// - Always accept if serial == `next_seq` (the item we're waiting for)
236    /// - Always accept if we can't pop (we're stuck, need more items)
237    /// - Reject if over the memory limit AND we can make progress
238    ///
239    /// # Arguments
240    ///
241    /// * `serial` - The sequence number of the item to potentially insert
242    /// * `memory_limit` - The memory limit in bytes (0 = unlimited)
243    ///
244    /// # Returns
245    ///
246    /// `true` if the item should be accepted, `false` if backpressure should be applied.
247    #[must_use]
248    pub fn would_accept(&self, serial: u64, memory_limit: u64) -> bool {
249        // No limit = always accept
250        if memory_limit == 0 {
251            return true;
252        }
253
254        // Always accept the item we need next (avoids deadlock)
255        if serial == self.next_seq {
256            return true;
257        }
258
259        // If we can't make progress, accept everything (avoids deadlock)
260        if !self.can_pop {
261            return true;
262        }
263
264        // We can make progress and this isn't the needed item.
265        // Apply backpressure if over limit.
266        self.heap_bytes < memory_limit
267    }
268
269    /// Calculate total heap memory used by all items in the buffer.
270    ///
271    /// This iterates all items and sums their heap sizes. Only use for
272    /// profiling/debugging as it's O(n) in the number of buffered items.
273    ///
274    /// Note: This calculates fresh from items using `MemoryEstimate`, not from
275    /// tracked sizes. Use `heap_bytes()` for the tracked value.
276    #[must_use]
277    pub fn total_heap_size(&self) -> usize
278    where
279        T: MemoryEstimate,
280    {
281        self.buffer
282            .iter()
283            .filter_map(|opt| opt.as_ref())
284            .map(|(item, _size)| item.estimate_heap_size())
285            .sum()
286    }
287
288    /// Set the next expected sequence number (for testing or reset scenarios).
289    #[cfg(test)]
290    pub fn set_next_seq(&mut self, seq: u64) {
291        self.next_seq = seq;
292    }
293}
294
295impl<T> Default for ReorderBuffer<T> {
296    fn default() -> Self {
297        Self::new()
298    }
299}
300
301/// Iterator that drains consecutive ready items from a `ReorderBuffer`.
302pub struct DrainReady<'a, T> {
303    buffer: &'a mut ReorderBuffer<T>,
304}
305
306impl<T> Iterator for DrainReady<'_, T> {
307    type Item = T;
308
309    fn next(&mut self) -> Option<Self::Item> {
310        self.buffer.try_pop_next()
311    }
312}
313
314#[cfg(test)]
315mod tests {
316    use super::*;
317
318    #[test]
319    fn test_in_order_insertion() {
320        let mut buffer: ReorderBuffer<i32> = ReorderBuffer::new();
321
322        buffer.insert(0, 100);
323        buffer.insert(1, 200);
324        buffer.insert(2, 300);
325
326        assert_eq!(buffer.try_pop_next(), Some(100));
327        assert_eq!(buffer.try_pop_next(), Some(200));
328        assert_eq!(buffer.try_pop_next(), Some(300));
329        assert_eq!(buffer.try_pop_next(), None);
330    }
331
332    #[test]
333    fn test_out_of_order_insertion() {
334        let mut buffer: ReorderBuffer<i32> = ReorderBuffer::new();
335
336        // Insert in reverse order
337        buffer.insert(2, 300);
338        buffer.insert(1, 200);
339        buffer.insert(0, 100);
340
341        // Should still pop in sequence order
342        assert_eq!(buffer.try_pop_next(), Some(100));
343        assert_eq!(buffer.try_pop_next(), Some(200));
344        assert_eq!(buffer.try_pop_next(), Some(300));
345        assert_eq!(buffer.try_pop_next(), None);
346    }
347
348    #[test]
349    fn test_gap_blocks_progress() {
350        let mut buffer: ReorderBuffer<i32> = ReorderBuffer::new();
351
352        buffer.insert(0, 100);
353        buffer.insert(2, 300); // Gap at 1
354
355        assert_eq!(buffer.try_pop_next(), Some(100));
356        assert_eq!(buffer.try_pop_next(), None); // Blocked on missing 1
357
358        // Fill the gap
359        buffer.insert(1, 200);
360
361        assert_eq!(buffer.try_pop_next(), Some(200));
362        assert_eq!(buffer.try_pop_next(), Some(300));
363        assert_eq!(buffer.try_pop_next(), None);
364    }
365
366    #[test]
367    fn test_drain_ready() {
368        let mut buffer: ReorderBuffer<i32> = ReorderBuffer::new();
369
370        buffer.insert(0, 100);
371        buffer.insert(1, 200);
372        buffer.insert(3, 400); // Gap at 2
373
374        let ready: Vec<_> = buffer.drain_ready().collect();
375        assert_eq!(ready, vec![100, 200]);
376
377        // Buffer should have 3 still pending
378        assert!(!buffer.is_empty());
379        assert_eq!(buffer.next_seq(), 2);
380
381        // Fill gap and drain again
382        buffer.insert(2, 300);
383        let more: Vec<_> = buffer.drain_ready().collect();
384        assert_eq!(more, vec![300, 400]);
385        assert!(buffer.is_empty());
386    }
387
388    #[test]
389    fn test_sparse_insertion() {
390        let mut buffer: ReorderBuffer<i32> = ReorderBuffer::new();
391
392        // Insert with gaps
393        buffer.insert(0, 100);
394        buffer.insert(5, 600);
395        buffer.insert(2, 300);
396
397        assert_eq!(buffer.try_pop_next(), Some(100));
398        assert_eq!(buffer.try_pop_next(), None); // Gap at 1
399
400        buffer.insert(1, 200);
401        assert_eq!(buffer.try_pop_next(), Some(200));
402        assert_eq!(buffer.try_pop_next(), Some(300));
403        assert_eq!(buffer.try_pop_next(), None); // Gap at 3, 4
404
405        buffer.insert(3, 400);
406        buffer.insert(4, 500);
407
408        let rest: Vec<_> = buffer.drain_ready().collect();
409        assert_eq!(rest, vec![400, 500, 600]);
410    }
411
412    #[test]
413    fn test_large_sequence_numbers() {
414        let mut buffer: ReorderBuffer<i32> = ReorderBuffer::new();
415
416        // Start from a large sequence number
417        let start = 1_000_000u64;
418        buffer.set_next_seq(start);
419
420        buffer.insert(start, 100);
421        buffer.insert(start + 1, 200);
422
423        assert_eq!(buffer.try_pop_next(), Some(100));
424        assert_eq!(buffer.try_pop_next(), Some(200));
425    }
426
427    #[test]
428    fn test_memory_tracking() {
429        let mut buffer: ReorderBuffer<i32> = ReorderBuffer::new();
430
431        // Insert with sizes
432        buffer.insert_with_size(0, 100, 1000);
433        buffer.insert_with_size(1, 200, 2000);
434        buffer.insert_with_size(2, 300, 3000);
435
436        assert_eq!(buffer.heap_bytes(), 6000);
437        assert_eq!(buffer.len(), 3);
438
439        // Pop and verify memory decreases
440        let (val, size) = buffer.try_pop_next_with_size().unwrap();
441        assert_eq!(val, 100);
442        assert_eq!(size, 1000);
443        assert_eq!(buffer.heap_bytes(), 5000);
444
445        let (val, size) = buffer.try_pop_next_with_size().unwrap();
446        assert_eq!(val, 200);
447        assert_eq!(size, 2000);
448        assert_eq!(buffer.heap_bytes(), 3000);
449    }
450
451    #[test]
452    fn test_can_pop_tracking() {
453        let mut buffer: ReorderBuffer<i32> = ReorderBuffer::new();
454
455        // Empty buffer - can't pop
456        assert!(!buffer.can_pop());
457
458        // Insert at seq 1 (gap at 0) - still can't pop
459        buffer.insert(1, 200);
460        assert!(!buffer.can_pop());
461
462        // Insert at seq 0 - now can pop
463        buffer.insert(0, 100);
464        assert!(buffer.can_pop());
465
466        // Pop - should still be able to pop (seq 1 is ready)
467        assert_eq!(buffer.try_pop_next(), Some(100));
468        assert!(buffer.can_pop());
469
470        // Pop again - now can't pop (empty)
471        assert_eq!(buffer.try_pop_next(), Some(200));
472        assert!(!buffer.can_pop());
473    }
474
475    #[test]
476    fn test_would_accept_no_limit() {
477        let mut buffer: ReorderBuffer<i32> = ReorderBuffer::new();
478        buffer.insert_with_size(0, 100, 1_000_000_000); // 1GB
479
480        // With no limit (0), always accept
481        assert!(buffer.would_accept(1, 0));
482        assert!(buffer.would_accept(999, 0));
483    }
484
485    #[test]
486    fn test_would_accept_needed_serial() {
487        let mut buffer: ReorderBuffer<i32> = ReorderBuffer::new();
488        buffer.insert_with_size(1, 200, 1_000_000_000); // 1GB, creates gap at 0
489
490        // We need serial 0, so always accept it even if over limit
491        assert!(buffer.would_accept(0, 100)); // limit is 100 bytes, we have 1GB
492    }
493
494    #[test]
495    fn test_would_accept_stuck() {
496        let mut buffer: ReorderBuffer<i32> = ReorderBuffer::new();
497        buffer.insert_with_size(2, 300, 1_000_000_000); // Gap at 0, 1
498
499        // Buffer is stuck (can't pop), so accept everything
500        assert!(!buffer.can_pop());
501        assert!(buffer.would_accept(3, 100)); // Not the needed serial, but we're stuck
502        assert!(buffer.would_accept(10, 100)); // Still stuck
503    }
504
505    #[test]
506    fn test_would_accept_over_limit() {
507        let mut buffer: ReorderBuffer<i32> = ReorderBuffer::new();
508        buffer.insert_with_size(0, 100, 1000);
509
510        // Can pop and over limit - reject non-needed serial
511        assert!(buffer.can_pop());
512        assert_eq!(buffer.heap_bytes(), 1000);
513
514        // Serial 1 is not the needed serial (0 is), and we're at 1000 bytes
515        // With limit 500, we're over limit, should reject
516        assert!(!buffer.would_accept(1, 500));
517
518        // But serial 0 (the next_seq) should always be accepted
519        // Actually next_seq is 0, and we have item at 0, so next needed is after pop
520        // Let me think: next_seq=0, item at 0 exists, so would_accept(0, ...) checks serial==next_seq
521        // which is true, so it returns true. But we already have 0! That's fine, the check
522        // is just for admission control before insertion.
523
524        // With limit 2000, we're under limit, should accept
525        assert!(buffer.would_accept(1, 2000));
526    }
527}