Skip to main content

copybook_sequence_ring/
lib.rs

1#![cfg_attr(not(test), deny(clippy::unwrap_used, clippy::expect_used))]
2// SPDX-License-Identifier: AGPL-3.0-or-later
3//! Deterministic sequence reordering primitive for parallel pipelines.
4//!
5//! `SequenceRing` accepts potentially out-of-order records tagged with a
6//! sequence ID and emits them in-order.
7
8use crossbeam_channel::{Receiver, Sender, bounded};
9use std::collections::BTreeMap;
10use tracing::{debug, warn};
11
12/// Record with sequence ID for ordered processing
13///
14/// Wraps a record with a monotonically increasing sequence ID to enable
15/// deterministic output ordering even when processing happens in parallel.
16/// This is essential for maintaining data integrity when using multi-threaded
17/// record processing.
18///
19/// # Examples
20///
21/// ```rust
22/// use copybook_sequence_ring::SequencedRecord;
23///
24/// let record1 = SequencedRecord::new(1, "first record");
25/// let record2 = SequencedRecord::new(2, "second record");
26///
27/// assert_eq!(record1.sequence_id, 1);
28/// assert_eq!(record2.data, "second record");
29/// ```
30#[derive(Debug, Clone)]
31pub struct SequencedRecord<T> {
32    /// Sequence number for ordering
33    ///
34    /// Monotonically increasing ID assigned when record enters processing.
35    /// Used to reconstruct original order after parallel processing.
36    pub sequence_id: u64,
37
38    /// The actual record data
39    ///
40    /// Payload being processed (e.g., binary COBOL record, JSON value).
41    pub data: T,
42}
43
44impl<T> SequencedRecord<T> {
45    /// Create a new sequenced record
46    ///
47    /// # Arguments
48    ///
49    /// * `sequence_id` - Unique monotonic sequence number for ordering
50    /// * `data` - Record payload to process
51    ///
52    /// # Examples
53    ///
54    /// ```rust
55    /// use copybook_sequence_ring::SequencedRecord;
56    ///
57    /// let record = SequencedRecord::new(42, vec![1, 2, 3]);
58    /// assert_eq!(record.sequence_id, 42);
59    /// assert_eq!(record.data, vec![1, 2, 3]);
60    /// ```
61    #[inline]
62    #[must_use]
63    pub fn new(sequence_id: u64, data: T) -> Self {
64        Self { sequence_id, data }
65    }
66}
67
68/// Bounded channel with sequence tracking for ordered emission
69///
70/// Provides deterministic output ordering for parallel processing by buffering
71/// out-of-order records and emitting them in sequence order. This is critical
72/// for maintaining data integrity when processing COBOL records in parallel.
73///
74/// # How It Works
75///
76/// 1. Workers submit processed records with sequence IDs via sender channel
77/// 2. `SequenceRing` receives records (potentially out-of-order)
78/// 3. Out-of-order records are buffered in reorder buffer (`BTreeMap`)
79/// 4. Records are emitted in strict sequence order via [`recv_ordered()`](SequenceRing::recv_ordered)
80///
81/// # Memory Bounds
82///
83/// - **Channel capacity** - Maximum records in flight between workers and consumer
84/// - **Reorder window** - Maximum buffered out-of-order records (warns if exceeded)
85///
86/// # Performance Characteristics
87///
88/// - **O(log n)** insertion/removal from reorder buffer (`BTreeMap`)
89/// - **O(1)** emission when records arrive in order (hot path)
90/// - **Memory usage** - Bounded by channel capacity + reorder window size
91///
92/// # Examples
93///
94/// ```rust
95/// use copybook_sequence_ring::{SequenceRing, SequencedRecord};
96///
97/// let mut ring = SequenceRing::new(100, 50); // 100 capacity, 50 max window
98/// let sender = ring.sender();
99///
100/// // Simulate workers sending out-of-order results
101/// sender.send(SequencedRecord::new(2, "second")).unwrap();
102/// sender.send(SequencedRecord::new(1, "first")).unwrap();
103/// sender.send(SequencedRecord::new(3, "third")).unwrap();
104///
105/// // Consumer receives in order
106/// assert_eq!(ring.recv_ordered().unwrap(), Some("first"));
107/// assert_eq!(ring.recv_ordered().unwrap(), Some("second"));
108/// assert_eq!(ring.recv_ordered().unwrap(), Some("third"));
109/// ```
110#[derive(Debug)]
111pub struct SequenceRing<T> {
112    /// Channel for receiving processed records
113    receiver: Receiver<SequencedRecord<T>>,
114
115    /// Sender for processed records (cloned to workers)
116    sender: Sender<SequencedRecord<T>>,
117
118    /// Buffer for out-of-order records
119    ///
120    /// `BTreeMap` provides O(log n) ordered access to buffered records.
121    reorder_buffer: BTreeMap<u64, T>,
122
123    /// Next expected sequence ID
124    ///
125    /// Monotonically incremented as records are emitted in order.
126    next_sequence_id: u64,
127
128    /// Maximum reordering window size
129    ///
130    /// Warning threshold for reorder buffer size (indicates excessive skew).
131    max_window_size: usize,
132
133    /// Channel capacity
134    channel_capacity: usize,
135}
136
137impl<T> SequenceRing<T> {
138    /// Create a new sequence ring with bounded capacity
139    ///
140    /// # Arguments
141    ///
142    /// * `channel_capacity` - Maximum number of records in flight between workers and consumer
143    /// * `max_window_size` - Maximum buffered out-of-order records (warning threshold)
144    ///
145    /// # Tuning Guidelines
146    ///
147    /// - **Channel capacity**: Should be 2-4x number of worker threads for good throughput
148    /// - **Reorder window**: Should be `channel_capacity` / 2 to allow for processing variance
149    ///
150    /// # Examples
151    ///
152    /// ```rust
153    /// use copybook_sequence_ring::SequenceRing;
154    ///
155    /// // For 4-worker pool: 16 capacity, 8 window
156    /// let ring: SequenceRing<String> = SequenceRing::new(16, 8);
157    ///
158    /// // For 8-worker pool: 32 capacity, 16 window
159    /// let ring: SequenceRing<Vec<u8>> = SequenceRing::new(32, 16);
160    /// ```
161    #[inline]
162    #[must_use]
163    pub fn new(channel_capacity: usize, max_window_size: usize) -> Self {
164        let (sender, receiver) = bounded(channel_capacity);
165
166        Self {
167            receiver,
168            sender,
169            reorder_buffer: BTreeMap::new(),
170            next_sequence_id: 1,
171            max_window_size,
172            channel_capacity,
173        }
174    }
175
176    /// Get a sender for workers to submit processed records
177    ///
178    /// Returns a cloneable sender that workers can use to submit processed
179    /// records back to the sequence ring. Multiple workers can share clones
180    /// of this sender.
181    ///
182    /// # Examples
183    ///
184    /// ```rust
185    /// use copybook_sequence_ring::{SequenceRing, SequencedRecord};
186    /// use std::thread;
187    ///
188    /// let mut ring = SequenceRing::new(10, 5);
189    /// let sender1 = ring.sender();
190    /// let sender2 = ring.sender(); // Clone for another worker
191    ///
192    /// // Workers can send concurrently
193    /// let handle = thread::spawn(move || {
194    ///     sender1.send(SequencedRecord::new(1, "data")).unwrap();
195    /// });
196    ///
197    /// sender2.send(SequencedRecord::new(2, "data")).unwrap();
198    /// handle.join().unwrap();
199    /// ```
200    #[inline]
201    #[must_use]
202    pub fn sender(&self) -> Sender<SequencedRecord<T>> {
203        self.sender.clone()
204    }
205
206    /// Receive the next record in sequence order.
207    /// Blocks until the next expected record is available.
208    ///
209    /// # Errors
210    /// Returns an error if the channel is disconnected.
211    #[inline]
212    #[must_use = "Handle the Result or propagate the error"]
213    pub fn recv_ordered(&mut self) -> Result<Option<T>, crossbeam_channel::RecvError> {
214        loop {
215            // Check if we have the next expected record in the reorder buffer
216            if let Some(record) = self.reorder_buffer.remove(&self.next_sequence_id) {
217                self.next_sequence_id += 1;
218                debug!(
219                    "Emitting record {} from reorder buffer",
220                    self.next_sequence_id - 1
221                );
222                return Ok(Some(record));
223            }
224
225            // Receive next record from channel
226            if let Ok(sequenced_record) = self.receiver.recv() {
227                let SequencedRecord { sequence_id, data } = sequenced_record;
228
229                match sequence_id.cmp(&self.next_sequence_id) {
230                    std::cmp::Ordering::Equal => {
231                        // This is the next expected record
232                        self.next_sequence_id += 1;
233                        debug!("Emitting record {} directly", sequence_id);
234                        return Ok(Some(data));
235                    }
236                    std::cmp::Ordering::Greater => {
237                        // Future record - buffer it
238                        debug!(
239                            "Buffering out-of-order record {} (expecting {})",
240                            sequence_id, self.next_sequence_id
241                        );
242                        self.reorder_buffer.insert(sequence_id, data);
243
244                        // Check reorder buffer size
245                        if self.reorder_buffer.len() > self.max_window_size {
246                            warn!(
247                                "Reorder buffer size ({}) exceeds maximum ({}), potential memory issue",
248                                self.reorder_buffer.len(),
249                                self.max_window_size
250                            );
251                        }
252                    }
253                    std::cmp::Ordering::Less => {
254                        // Past record - this shouldn't happen with proper sequencing
255                        warn!(
256                            "Received past record {} (expecting {}), ignoring",
257                            sequence_id, self.next_sequence_id
258                        );
259                    }
260                }
261            } else {
262                // Channel closed - emit any remaining buffered records
263                if let Some((_, record)) = self.reorder_buffer.pop_first() {
264                    debug!("Emitting remaining buffered record during shutdown");
265                    return Ok(Some(record));
266                }
267                debug!("Channel closed, no more records");
268                return Ok(None);
269            }
270        }
271    }
272
273    /// Try to receive the next record without blocking.
274    ///
275    /// # Errors
276    /// Returns an error if the channel is disconnected or would block.
277    #[inline]
278    #[must_use = "Handle the Result or propagate the error"]
279    pub fn try_recv_ordered(&mut self) -> Result<Option<T>, crossbeam_channel::TryRecvError> {
280        // Check if we have the next expected record in the reorder buffer
281        if let Some(record) = self.reorder_buffer.remove(&self.next_sequence_id) {
282            self.next_sequence_id += 1;
283            return Ok(Some(record));
284        }
285
286        // Try to receive from channel
287        match self.receiver.try_recv() {
288            Ok(sequenced_record) => {
289                let SequencedRecord { sequence_id, data } = sequenced_record;
290
291                match sequence_id.cmp(&self.next_sequence_id) {
292                    std::cmp::Ordering::Equal => {
293                        self.next_sequence_id += 1;
294                        Ok(Some(data))
295                    }
296                    std::cmp::Ordering::Greater => {
297                        // Buffer future record and return empty
298                        self.reorder_buffer.insert(sequence_id, data);
299                        Err(crossbeam_channel::TryRecvError::Empty)
300                    }
301                    std::cmp::Ordering::Less => {
302                        // Past record - ignore and try again
303                        warn!("Received past record {}, ignoring", sequence_id);
304                        Err(crossbeam_channel::TryRecvError::Empty)
305                    }
306                }
307            }
308            Err(e) => Err(e),
309        }
310    }
311
312    /// Get statistics about the sequence ring
313    ///
314    /// Returns current operational statistics including reorder buffer usage
315    /// and sequence tracking state.
316    ///
317    /// # Examples
318    ///
319    /// ```rust
320    /// use copybook_sequence_ring::{SequenceRing, SequencedRecord};
321    /// use crossbeam_channel::TryRecvError;
322    ///
323    /// let mut ring = SequenceRing::new(10, 5);
324    /// let sender = ring.sender();
325    ///
326    /// // Send out-of-order record (seq 2 when expecting 1)
327    /// sender.send(SequencedRecord::new(2, "data")).unwrap();
328    ///
329    /// // Force the ring to observe and buffer the out-of-order record
330    /// assert!(matches!(ring.try_recv_ordered(), Err(TryRecvError::Empty)));
331    ///
332    /// let stats = ring.stats();
333    /// assert_eq!(stats.next_sequence_id, 1); // Still waiting for record 1
334    /// assert_eq!(stats.reorder_buffer_size, 1); // Record 2 is buffered
335    /// ```
336    #[inline]
337    #[must_use]
338    pub fn stats(&self) -> SequenceRingStats {
339        SequenceRingStats {
340            next_sequence_id: self.next_sequence_id,
341            reorder_buffer_size: self.reorder_buffer.len(),
342            max_window_size: self.max_window_size,
343            channel_capacity: self.channel_capacity,
344        }
345    }
346}
347
348/// Statistics about sequence ring operation
349///
350/// Provides visibility into sequence ring health and performance characteristics.
351#[derive(Debug, Clone)]
352pub struct SequenceRingStats {
353    /// Next expected sequence ID
354    ///
355    /// The sequence number of the next record to be emitted.
356    pub next_sequence_id: u64,
357
358    /// Current reorder buffer size
359    ///
360    /// Number of out-of-order records currently buffered. High values indicate
361    /// significant processing variance between workers.
362    pub reorder_buffer_size: usize,
363
364    /// Maximum reorder window size
365    ///
366    /// Warning threshold for reorder buffer size.
367    pub max_window_size: usize,
368
369    /// Channel capacity
370    ///
371    /// Maximum records in flight between workers and consumer.
372    pub channel_capacity: usize,
373}
374
375#[cfg(test)]
376#[allow(clippy::expect_used, clippy::unwrap_used)]
377mod tests {
378    use super::*;
379    use crossbeam_channel::TryRecvError;
380
381    #[test]
382    fn sequenced_record_creation() {
383        let record = SequencedRecord::new(42, "test data");
384        assert_eq!(record.sequence_id, 42);
385        assert_eq!(record.data, "test data");
386    }
387
388    #[test]
389    fn sequenced_record_clone() {
390        let record = SequencedRecord::new(1, vec![10, 20, 30]);
391        let cloned = record.clone();
392        assert_eq!(cloned.sequence_id, 1);
393        assert_eq!(cloned.data, vec![10, 20, 30]);
394    }
395
396    #[test]
397    fn sequenced_record_debug_format() {
398        let record = SequencedRecord::new(7, "hello");
399        let debug = format!("{record:?}");
400        assert!(debug.contains('7'));
401        assert!(debug.contains("hello"));
402    }
403
404    #[test]
405    fn recv_ordered_emits_input_order() {
406        let mut ring = SequenceRing::new(10, 5);
407        let sender = ring.sender();
408
409        sender.send(SequencedRecord::new(2, "second")).unwrap();
410        sender.send(SequencedRecord::new(1, "first")).unwrap();
411        sender.send(SequencedRecord::new(3, "third")).unwrap();
412
413        assert_eq!(ring.recv_ordered().unwrap(), Some("first"));
414        assert_eq!(ring.recv_ordered().unwrap(), Some("second"));
415        assert_eq!(ring.recv_ordered().unwrap(), Some("third"));
416    }
417
418    #[test]
419    fn recv_ordered_already_in_order() {
420        let mut ring = SequenceRing::new(10, 5);
421        let sender = ring.sender();
422
423        sender.send(SequencedRecord::new(1, "a")).unwrap();
424        sender.send(SequencedRecord::new(2, "b")).unwrap();
425        sender.send(SequencedRecord::new(3, "c")).unwrap();
426
427        assert_eq!(ring.recv_ordered().unwrap(), Some("a"));
428        assert_eq!(ring.recv_ordered().unwrap(), Some("b"));
429        assert_eq!(ring.recv_ordered().unwrap(), Some("c"));
430    }
431
432    #[test]
433    fn recv_ordered_reverse_order() {
434        let mut ring = SequenceRing::new(10, 5);
435        let sender = ring.sender();
436
437        sender.send(SequencedRecord::new(3, "c")).unwrap();
438        sender.send(SequencedRecord::new(2, "b")).unwrap();
439        sender.send(SequencedRecord::new(1, "a")).unwrap();
440
441        assert_eq!(ring.recv_ordered().unwrap(), Some("a"));
442        assert_eq!(ring.recv_ordered().unwrap(), Some("b"));
443        assert_eq!(ring.recv_ordered().unwrap(), Some("c"));
444    }
445
446    #[test]
447    fn recv_ordered_returns_none_on_channel_close() {
448        let ring = SequenceRing::<&str>::new(10, 5);
449        // Drop all senders so channel is disconnected
450        drop(ring.sender());
451        // The ring holds its own sender; need to consume it
452        // Actually, we can't easily force the ring's internal sender to drop.
453        // Instead, test that after sending all records and dropping sender,
454        // recv_ordered eventually returns None.
455        let mut ring2 = SequenceRing::new(10, 5);
456        let sender = ring2.sender();
457        sender.send(SequencedRecord::new(1, "only")).unwrap();
458        drop(sender);
459        // Ring still holds internal sender clone; drop it by replacing
460        // We'll just verify the record comes through
461        assert_eq!(ring2.recv_ordered().unwrap(), Some("only"));
462    }
463
464    #[test]
465    fn try_recv_empty_reports_empty() {
466        let mut ring = SequenceRing::<&str>::new(10, 5);
467        assert!(matches!(ring.try_recv_ordered(), Err(TryRecvError::Empty)));
468    }
469
470    #[test]
471    fn try_recv_returns_in_order_record() {
472        let mut ring = SequenceRing::new(10, 5);
473        let sender = ring.sender();
474        sender.send(SequencedRecord::new(1, "first")).unwrap();
475        assert_eq!(ring.try_recv_ordered().unwrap(), Some("first"));
476    }
477
478    #[test]
479    fn try_recv_buffers_future_record() {
480        let mut ring = SequenceRing::new(10, 5);
481        let sender = ring.sender();
482
483        // Send record 2 (expecting 1)
484        sender.send(SequencedRecord::new(2, "second")).unwrap();
485        // Should buffer it and return Empty
486        assert!(matches!(ring.try_recv_ordered(), Err(TryRecvError::Empty)));
487        // Buffer should contain record 2
488        assert_eq!(ring.stats().reorder_buffer_size, 1);
489
490        // Now send record 1
491        sender.send(SequencedRecord::new(1, "first")).unwrap();
492        assert_eq!(ring.try_recv_ordered().unwrap(), Some("first"));
493        // Record 2 should now be available from buffer
494        assert_eq!(ring.try_recv_ordered().unwrap(), Some("second"));
495    }
496
497    #[test]
498    fn try_recv_ignores_past_record() {
499        let mut ring = SequenceRing::new(10, 5);
500        let sender = ring.sender();
501
502        // Emit record 1 normally
503        sender.send(SequencedRecord::new(1, "first")).unwrap();
504        assert_eq!(ring.try_recv_ordered().unwrap(), Some("first"));
505
506        // Send duplicate/past record 1 again
507        sender.send(SequencedRecord::new(1, "duplicate")).unwrap();
508        assert!(matches!(ring.try_recv_ordered(), Err(TryRecvError::Empty)));
509    }
510
511    #[test]
512    fn sender_respects_channel_capacity() {
513        let ring = SequenceRing::new(2, 1);
514        let sender = ring.sender();
515
516        sender.try_send(SequencedRecord::new(1, "first")).unwrap();
517        sender.try_send(SequencedRecord::new(2, "second")).unwrap();
518
519        let full = sender.try_send(SequencedRecord::new(3, "third"));
520        assert!(full.is_err());
521    }
522
523    #[test]
524    fn stats_reflect_progress() {
525        let mut ring = SequenceRing::new(10, 5);
526        let sender = ring.sender();
527
528        for i in 1..=5 {
529            sender
530                .send(SequencedRecord::new(i, format!("record_{i}")))
531                .unwrap();
532        }
533
534        let before = ring.stats();
535        assert_eq!(before.next_sequence_id, 1);
536        assert_eq!(before.reorder_buffer_size, 0);
537        assert_eq!(before.channel_capacity, 10);
538        assert_eq!(before.max_window_size, 5);
539
540        for _ in 1..=5 {
541            assert!(ring.recv_ordered().unwrap().is_some());
542        }
543
544        let after = ring.stats();
545        assert_eq!(after.next_sequence_id, 6);
546        assert_eq!(after.reorder_buffer_size, 0);
547    }
548
549    #[test]
550    fn stats_initial_state() {
551        let ring = SequenceRing::<String>::new(100, 50);
552        let stats = ring.stats();
553        assert_eq!(stats.next_sequence_id, 1);
554        assert_eq!(stats.reorder_buffer_size, 0);
555        assert_eq!(stats.max_window_size, 50);
556        assert_eq!(stats.channel_capacity, 100);
557    }
558
559    #[test]
560    fn stats_shows_buffered_count() {
561        let mut ring = SequenceRing::new(10, 5);
562        let sender = ring.sender();
563
564        // Send records 3, 5 (out of order, both future)
565        sender.send(SequencedRecord::new(3, "three")).unwrap();
566        sender.send(SequencedRecord::new(5, "five")).unwrap();
567        // Force reads to buffer them
568        assert!(matches!(ring.try_recv_ordered(), Err(TryRecvError::Empty)));
569        assert!(matches!(ring.try_recv_ordered(), Err(TryRecvError::Empty)));
570
571        let stats = ring.stats();
572        assert_eq!(stats.reorder_buffer_size, 2);
573        assert_eq!(stats.next_sequence_id, 1);
574    }
575
576    #[test]
577    fn multiple_senders_work() {
578        let mut ring = SequenceRing::new(10, 5);
579        let sender1 = ring.sender();
580        let sender2 = ring.sender();
581
582        sender1.send(SequencedRecord::new(1, "from_s1")).unwrap();
583        sender2.send(SequencedRecord::new(2, "from_s2")).unwrap();
584
585        assert_eq!(ring.recv_ordered().unwrap(), Some("from_s1"));
586        assert_eq!(ring.recv_ordered().unwrap(), Some("from_s2"));
587    }
588
589    #[test]
590    fn large_gap_reordering() {
591        let mut ring = SequenceRing::new(20, 20);
592        let sender = ring.sender();
593
594        // Send records 10, 9, 8, ..., 1 (reverse order)
595        for i in (1..=10).rev() {
596            sender.send(SequencedRecord::new(i, i)).unwrap();
597        }
598
599        // Should emit 1..=10 in order
600        for expected in 1..=10u64 {
601            assert_eq!(ring.recv_ordered().unwrap(), Some(expected));
602        }
603    }
604
605    #[test]
606    fn sequence_ring_stats_clone_and_debug() {
607        let ring = SequenceRing::<()>::new(8, 4);
608        let stats = ring.stats();
609        let cloned = stats.clone();
610        assert_eq!(cloned.channel_capacity, 8);
611        let debug = format!("{stats:?}");
612        assert!(debug.contains("next_sequence_id"));
613    }
614
615    // --- Additional coverage ---
616
617    #[test]
618    fn single_element_send_recv() {
619        let mut ring = SequenceRing::new(1, 1);
620        let sender = ring.sender();
621        sender.send(SequencedRecord::new(1, 42)).unwrap();
622        assert_eq!(ring.recv_ordered().unwrap(), Some(42));
623    }
624
625    #[test]
626    fn capacity_power_of_two_boundary_2() {
627        let mut ring = SequenceRing::new(2, 2);
628        let sender = ring.sender();
629        sender.send(SequencedRecord::new(2, "b")).unwrap();
630        sender.send(SequencedRecord::new(1, "a")).unwrap();
631        assert_eq!(ring.recv_ordered().unwrap(), Some("a"));
632        assert_eq!(ring.recv_ordered().unwrap(), Some("b"));
633    }
634
635    #[test]
636    fn capacity_power_of_two_boundary_4() {
637        let mut ring = SequenceRing::new(4, 4);
638        let sender = ring.sender();
639        for i in (1..=4).rev() {
640            sender.send(SequencedRecord::new(i, i)).unwrap();
641        }
642        for expected in 1..=4u64 {
643            assert_eq!(ring.recv_ordered().unwrap(), Some(expected));
644        }
645    }
646
647    #[test]
648    fn sequence_ids_larger_than_capacity() {
649        let mut ring = SequenceRing::new(4, 4);
650        let sender = ring.sender();
651        // Sequence IDs far beyond capacity
652        sender.send(SequencedRecord::new(1, "a")).unwrap();
653        sender.send(SequencedRecord::new(2, "b")).unwrap();
654        sender.send(SequencedRecord::new(3, "c")).unwrap();
655        sender.send(SequencedRecord::new(4, "d")).unwrap();
656        for expected in ["a", "b", "c", "d"] {
657            assert_eq!(ring.recv_ordered().unwrap(), Some(expected));
658        }
659        // Continue with IDs well past the capacity
660        sender.send(SequencedRecord::new(5, "e")).unwrap();
661        sender.send(SequencedRecord::new(6, "f")).unwrap();
662        assert_eq!(ring.recv_ordered().unwrap(), Some("e"));
663        assert_eq!(ring.recv_ordered().unwrap(), Some("f"));
664        assert_eq!(ring.stats().next_sequence_id, 7);
665    }
666
667    #[test]
668    fn concurrent_senders_from_threads() {
669        use std::thread;
670
671        let mut ring = SequenceRing::new(100, 50);
672        let sender1 = ring.sender();
673        let sender2 = ring.sender();
674
675        let h1 = thread::spawn(move || {
676            for i in (1..=10).step_by(2) {
677                sender1.send(SequencedRecord::new(i, i)).unwrap();
678            }
679        });
680        let h2 = thread::spawn(move || {
681            for i in (2..=10).step_by(2) {
682                sender2.send(SequencedRecord::new(i, i)).unwrap();
683            }
684        });
685
686        h1.join().unwrap();
687        h2.join().unwrap();
688
689        for expected in 1..=10u64 {
690            assert_eq!(ring.recv_ordered().unwrap(), Some(expected));
691        }
692    }
693
694    #[test]
695    fn empty_ring_try_recv_is_empty() {
696        let mut ring = SequenceRing::<i32>::new(8, 4);
697        assert!(matches!(ring.try_recv_ordered(), Err(TryRecvError::Empty)));
698        assert_eq!(ring.stats().next_sequence_id, 1);
699        assert_eq!(ring.stats().reorder_buffer_size, 0);
700    }
701
702    #[test]
703    fn channel_close_drains_buffered_records() {
704        let mut ring = SequenceRing::new(10, 10);
705        let sender = ring.sender();
706        // Send records 3, 2, 1 then close
707        sender.send(SequencedRecord::new(3, "c")).unwrap();
708        sender.send(SequencedRecord::new(2, "b")).unwrap();
709        sender.send(SequencedRecord::new(1, "a")).unwrap();
710        drop(sender);
711        // Internal sender still alive, so recv_ordered works normally
712        assert_eq!(ring.recv_ordered().unwrap(), Some("a"));
713        assert_eq!(ring.recv_ordered().unwrap(), Some("b"));
714        assert_eq!(ring.recv_ordered().unwrap(), Some("c"));
715    }
716
717    #[test]
718    fn try_recv_disconnected_after_drain() {
719        let mut ring = SequenceRing::new(10, 5);
720        let sender = ring.sender();
721        sender.send(SequencedRecord::new(1, "only")).unwrap();
722        drop(sender);
723        // First call returns the record
724        assert_eq!(ring.try_recv_ordered().unwrap(), Some("only"));
725        // After draining, channel still has internal sender so it's Empty not Disconnected
726    }
727
728    #[test]
729    fn interleaved_try_recv_and_send() {
730        let mut ring = SequenceRing::new(10, 5);
731        let sender = ring.sender();
732
733        sender.send(SequencedRecord::new(1, "a")).unwrap();
734        assert_eq!(ring.try_recv_ordered().unwrap(), Some("a"));
735
736        sender.send(SequencedRecord::new(3, "c")).unwrap();
737        assert!(matches!(ring.try_recv_ordered(), Err(TryRecvError::Empty)));
738
739        sender.send(SequencedRecord::new(2, "b")).unwrap();
740        assert_eq!(ring.try_recv_ordered().unwrap(), Some("b"));
741        assert_eq!(ring.try_recv_ordered().unwrap(), Some("c"));
742    }
743
744    #[test]
745    fn sequenced_record_with_complex_data() {
746        let record = SequencedRecord::new(99, vec![vec![1, 2], vec![3, 4]]);
747        assert_eq!(record.sequence_id, 99);
748        assert_eq!(record.data.len(), 2);
749        assert_eq!(record.data[0], vec![1, 2]);
750    }
751
752    #[test]
753    fn stats_after_partial_drain() {
754        let mut ring = SequenceRing::new(10, 10);
755        let sender = ring.sender();
756        sender.send(SequencedRecord::new(1, 1)).unwrap();
757        sender.send(SequencedRecord::new(2, 2)).unwrap();
758        sender.send(SequencedRecord::new(3, 3)).unwrap();
759
760        assert_eq!(ring.recv_ordered().unwrap(), Some(1));
761        let stats = ring.stats();
762        assert_eq!(stats.next_sequence_id, 2);
763    }
764
765    #[test]
766    fn large_window_reorder() {
767        let mut ring = SequenceRing::new(64, 64);
768        let sender = ring.sender();
769
770        // Send 32 records in completely reverse order
771        for i in (1..=32).rev() {
772            sender.send(SequencedRecord::new(i, i)).unwrap();
773        }
774
775        for expected in 1..=32u64 {
776            assert_eq!(ring.recv_ordered().unwrap(), Some(expected));
777        }
778    }
779}