pub struct SequenceRing<T> { /* private fields */ }Expand description
Bounded channel with sequence tracking for ordered emission
Provides deterministic output ordering for parallel processing by buffering out-of-order records and emitting them in sequence order. This is critical for maintaining data integrity when processing COBOL records in parallel.
§How It Works
- Workers submit processed records with sequence IDs via sender channel
SequenceRingreceives records (potentially out-of-order)- Out-of-order records are buffered in reorder buffer (
BTreeMap) - Records are emitted in strict sequence order via
recv_ordered()
§Memory Bounds
- Channel capacity - Maximum records in flight between workers and consumer
- Reorder window - Maximum buffered out-of-order records (warns if exceeded)
§Performance Characteristics
- O(log n) insertion/removal from reorder buffer (
BTreeMap) - O(1) emission when records arrive in order (hot path)
- Memory usage - Bounded by channel capacity + reorder window size
§Examples
use copybook_sequence_ring::{SequenceRing, SequencedRecord};
let mut ring = SequenceRing::new(100, 50); // 100 capacity, 50 max window
let sender = ring.sender();
// Simulate workers sending out-of-order results
sender.send(SequencedRecord::new(2, "second")).unwrap();
sender.send(SequencedRecord::new(1, "first")).unwrap();
sender.send(SequencedRecord::new(3, "third")).unwrap();
// Consumer receives in order
assert_eq!(ring.recv_ordered().unwrap(), Some("first"));
assert_eq!(ring.recv_ordered().unwrap(), Some("second"));
assert_eq!(ring.recv_ordered().unwrap(), Some("third"));Implementations§
Source§impl<T> SequenceRing<T>
impl<T> SequenceRing<T>
Sourcepub fn new(channel_capacity: usize, max_window_size: usize) -> Self
pub fn new(channel_capacity: usize, max_window_size: usize) -> Self
Create a new sequence ring with bounded capacity
§Arguments
channel_capacity- Maximum number of records in flight between workers and consumermax_window_size- Maximum buffered out-of-order records (warning threshold)
§Tuning Guidelines
- Channel capacity: Should be 2-4x number of worker threads for good throughput
- Reorder window: Should be
channel_capacity/ 2 to allow for processing variance
§Examples
use copybook_sequence_ring::SequenceRing;
// For 4-worker pool: 16 capacity, 8 window
let ring: SequenceRing<String> = SequenceRing::new(16, 8);
// For 8-worker pool: 32 capacity, 16 window
let ring: SequenceRing<Vec<u8>> = SequenceRing::new(32, 16);Sourcepub fn sender(&self) -> Sender<SequencedRecord<T>>
pub fn sender(&self) -> Sender<SequencedRecord<T>>
Get a sender for workers to submit processed records
Returns a cloneable sender that workers can use to submit processed records back to the sequence ring. Multiple workers can share clones of this sender.
§Examples
use copybook_sequence_ring::{SequenceRing, SequencedRecord};
use std::thread;
let mut ring = SequenceRing::new(10, 5);
let sender1 = ring.sender();
let sender2 = ring.sender(); // Clone for another worker
// Workers can send concurrently
let handle = thread::spawn(move || {
sender1.send(SequencedRecord::new(1, "data")).unwrap();
});
sender2.send(SequencedRecord::new(2, "data")).unwrap();
handle.join().unwrap();Sourcepub fn recv_ordered(&mut self) -> Result<Option<T>, RecvError>
pub fn recv_ordered(&mut self) -> Result<Option<T>, RecvError>
Receive the next record in sequence order. Blocks until the next expected record is available.
§Errors
Returns an error if the channel is disconnected.
Sourcepub fn try_recv_ordered(&mut self) -> Result<Option<T>, TryRecvError>
pub fn try_recv_ordered(&mut self) -> Result<Option<T>, TryRecvError>
Try to receive the next record without blocking.
§Errors
Returns an error if the channel is disconnected or would block.
Sourcepub fn stats(&self) -> SequenceRingStats
pub fn stats(&self) -> SequenceRingStats
Get statistics about the sequence ring
Returns current operational statistics including reorder buffer usage and sequence tracking state.
§Examples
use copybook_sequence_ring::{SequenceRing, SequencedRecord};
use crossbeam_channel::TryRecvError;
let mut ring = SequenceRing::new(10, 5);
let sender = ring.sender();
// Send out-of-order record (seq 2 when expecting 1)
sender.send(SequencedRecord::new(2, "data")).unwrap();
// Force the ring to observe and buffer the out-of-order record
assert!(matches!(ring.try_recv_ordered(), Err(TryRecvError::Empty)));
let stats = ring.stats();
assert_eq!(stats.next_sequence_id, 1); // Still waiting for record 1
assert_eq!(stats.reorder_buffer_size, 1); // Record 2 is buffered