Skip to main content

SequenceRing

Struct SequenceRing 

Source
pub struct SequenceRing<T> { /* private fields */ }
Expand description

Bounded channel with sequence tracking for ordered emission

Provides deterministic output ordering for parallel processing by buffering out-of-order records and emitting them in sequence order. This is critical for maintaining data integrity when processing COBOL records in parallel.

§How It Works

  1. Workers submit processed records with sequence IDs via sender channel
  2. SequenceRing receives records (potentially out-of-order)
  3. Out-of-order records are buffered in reorder buffer (BTreeMap)
  4. Records are emitted in strict sequence order via recv_ordered()

§Memory Bounds

  • Channel capacity - Maximum records in flight between workers and consumer
  • Reorder window - Maximum buffered out-of-order records (warns if exceeded)

§Performance Characteristics

  • O(log n) insertion/removal from reorder buffer (BTreeMap)
  • O(1) emission when records arrive in order (hot path)
  • Memory usage - Bounded by channel capacity + reorder window size

§Examples

use copybook_sequence_ring::{SequenceRing, SequencedRecord};

let mut ring = SequenceRing::new(100, 50); // 100 capacity, 50 max window
let sender = ring.sender();

// Simulate workers sending out-of-order results
sender.send(SequencedRecord::new(2, "second")).unwrap();
sender.send(SequencedRecord::new(1, "first")).unwrap();
sender.send(SequencedRecord::new(3, "third")).unwrap();

// Consumer receives in order
assert_eq!(ring.recv_ordered().unwrap(), Some("first"));
assert_eq!(ring.recv_ordered().unwrap(), Some("second"));
assert_eq!(ring.recv_ordered().unwrap(), Some("third"));

Implementations§

Source§

impl<T> SequenceRing<T>

Source

pub fn new(channel_capacity: usize, max_window_size: usize) -> SequenceRing<T>

Create a new sequence ring with bounded capacity

§Arguments
  • channel_capacity - Maximum number of records in flight between workers and consumer
  • max_window_size - Maximum buffered out-of-order records (warning threshold)
§Tuning Guidelines
  • Channel capacity: Should be 2-4x number of worker threads for good throughput
  • Reorder window: Should be channel_capacity / 2 to allow for processing variance
§Examples
use copybook_sequence_ring::SequenceRing;

// For 4-worker pool: 16 capacity, 8 window
let ring: SequenceRing<String> = SequenceRing::new(16, 8);

// For 8-worker pool: 32 capacity, 16 window
let ring: SequenceRing<Vec<u8>> = SequenceRing::new(32, 16);
Source

pub fn sender(&self) -> Sender<SequencedRecord<T>>

Get a sender for workers to submit processed records

Returns a cloneable sender that workers can use to submit processed records back to the sequence ring. Multiple workers can share clones of this sender.

§Examples
use copybook_sequence_ring::{SequenceRing, SequencedRecord};
use std::thread;

let mut ring = SequenceRing::new(10, 5);
let sender1 = ring.sender();
let sender2 = ring.sender(); // Clone for another worker

// Workers can send concurrently
let handle = thread::spawn(move || {
    sender1.send(SequencedRecord::new(1, "data")).unwrap();
});

sender2.send(SequencedRecord::new(2, "data")).unwrap();
handle.join().unwrap();
Source

pub fn recv_ordered(&mut self) -> Result<Option<T>, RecvError>

Receive the next record in sequence order. Blocks until the next expected record is available.

§Errors

Returns an error if the channel is disconnected.

Source

pub fn try_recv_ordered(&mut self) -> Result<Option<T>, TryRecvError>

Try to receive the next record without blocking.

§Errors

Returns an error if the channel is disconnected or would block.

Source

pub fn stats(&self) -> SequenceRingStats

Get statistics about the sequence ring

Returns current operational statistics including reorder buffer usage and sequence tracking state.

§Examples
use copybook_sequence_ring::{SequenceRing, SequencedRecord};
use crossbeam_channel::TryRecvError;

let mut ring = SequenceRing::new(10, 5);
let sender = ring.sender();

// Send out-of-order record (seq 2 when expecting 1)
sender.send(SequencedRecord::new(2, "data")).unwrap();

// Force the ring to observe and buffer the out-of-order record
assert!(matches!(ring.try_recv_ordered(), Err(TryRecvError::Empty)));

let stats = ring.stats();
assert_eq!(stats.next_sequence_id, 1); // Still waiting for record 1
assert_eq!(stats.reorder_buffer_size, 1); // Record 2 is buffered

Trait Implementations§

Source§

impl<T> Debug for SequenceRing<T>
where T: Debug,

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error>

Formats the value using the given formatter. Read more

Auto Trait Implementations§

§

impl<T> Freeze for SequenceRing<T>

§

impl<T> RefUnwindSafe for SequenceRing<T>
where T: RefUnwindSafe,

§

impl<T> Send for SequenceRing<T>
where T: Send,

§

impl<T> Sync for SequenceRing<T>
where T: Send + Sync,

§

impl<T> Unpin for SequenceRing<T>
where T: Unpin,

§

impl<T> UnsafeUnpin for SequenceRing<T>

§

impl<T> UnwindSafe for SequenceRing<T>
where T: RefUnwindSafe,

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more