Struct BatchedQueue

Source
pub struct BatchedQueue<T> { /* private fields */ }
Expand description

A thread-safe, high-performance queue that automatically batches items.

BatchedQueue collects individual items until reaching the configured batch size, then automatically makes the batch available for processing. This batching approach can significantly improve throughput in high-volume systems by reducing overhead.

§Examples

use batched_queue::{BatchedQueue, BatchedQueueTrait};

use std::thread;
use std::time::Duration;

// Create a queue with batch size of 5
let queue = BatchedQueue::new(5).expect("Failed to create queue");

// Create a sender that can be shared across threads
let sender = queue.create_sender();

// Producer thread
let producer = thread::spawn(move || {
    for i in 0..20 {
        sender.push(i).expect("Failed to push item");
        thread::sleep(Duration::from_millis(10));
    }
    sender.flush().expect("Failed to flush"); // Send any remaining items
});

// Consumer thread
let consumer = thread::spawn(move || {
    let mut all_items = Vec::new();
     
    // Process batches as they become available
    while all_items.len() < 20 {
        if let Ok(batch) = queue.next_batch_timeout(Duration::from_millis(100)) {
            all_items.extend(batch);
        }
    }
     
    all_items
});

// Wait for threads to complete
producer.join().unwrap();
let result = consumer.join().unwrap();

assert_eq!(result.len(), 20);

Implementations§

Source§

impl<T: Send + 'static> BatchedQueue<T>

Source

pub fn new(batch_size: usize) -> Result<Self, BatchedQueueError>

Creates a new batched queue with the specified batch size and an unbounded channel.

§Arguments
  • batch_size - The number of items to collect before forming a batch
§Examples
use batched_queue::BatchedQueue;

let queue = BatchedQueue::<String>::new(10).expect("Failed to create queue");
Source

pub fn new_bounded( batch_size: usize, max_batches: usize, ) -> Result<Self, BatchedQueueError>

Creates a new batched queue with a bounded channel for backpressure.

Using a bounded channel helps control memory usage by limiting the number of batches that can be queued at once. When the channel is full, producers will block when attempting to send a full batch.

§Arguments
  • batch_size - The number of items to collect before forming a batch
  • max_batches - The maximum number of batches that can be queued
§Examples
use batched_queue::BatchedQueue;

// Create a queue with batch size 10 and at most 5 batches in the channel
let queue = BatchedQueue::<i32>::new_bounded(10, 5).expect("Failed to create queue");
Source

pub fn create_sender(&self) -> BatchedQueueSender<T>

Creates a sender for this queue that can be shared across threads.

Multiple senders can be created from a single queue, allowing for concurrent producers.

§Returns

A new BatchedQueueSender linked to this queue

§Examples
use batched_queue::BatchedQueue;
use std::thread;

let queue = BatchedQueue::<i32>::new(10).expect("Failed to create queue");

// Create multiple senders for different threads
let sender1 = queue.create_sender();
let sender2 = queue.create_sender();

// Use senders in different threads
let t1 = thread::spawn(move || {
    for i in 0..10 {
        sender1.push(i).expect("Failed to push item");
    }
});

let t2 = thread::spawn(move || {
    for i in 10..20 {
        sender2.push(i).expect("Failed to push item");
    }
});

// Wait for producers to finish
t1.join().unwrap();
t2.join().unwrap();
Source

pub fn close_queue(&self) -> Vec<T>

Takes any items left in the current batch and returns them when shutting down.

This method is useful during controlled shutdown to collect any remaining items that haven’t formed a complete batch.

§Returns

A vector containing any items that were in the current batch

§Examples
use batched_queue::BatchedQueue;

let queue = BatchedQueue::<i32>::new(10).expect("Failed to create queue");
let sender = queue.create_sender();

// Add some items, but not enough to form a complete batch
for i in 0..3 {
    sender.push(i).expect("Failed to push item");
}

// Close the queue and get remaining items
let remaining = queue.close_queue();
assert_eq!(remaining.len(), 3);

Trait Implementations§

Source§

impl<T: Send + 'static> BatchedQueueTrait<T> for BatchedQueue<T>

Source§

fn push(&self, item: T) -> Result<(), BatchedQueueError>

Adds an item to the queue. Read more
Source§

fn try_next_batch(&self) -> Result<Option<Vec<T>>, BatchedQueueError>

Attempts to retrieve the next batch of items without blocking. Read more
Source§

fn next_batch(&self) -> Result<Vec<T>, BatchedQueueError>

Retrieves the next batch of items, blocking until one is available. Read more
Source§

fn next_batch_timeout( &self, timeout: Duration, ) -> Result<Vec<T>, BatchedQueueError>

Retrieves the next batch of items, blocking until one is available or until the timeout expires. Read more
Source§

fn len(&self) -> usize

Returns the current number of items in the queue.
Source§

fn capacity(&self) -> usize

Returns the maximum number of items a batch can hold.
Source§

fn flush(&self) -> Result<(), BatchedQueueError>

Flushes any pending items into a batch, even if the batch is not full. Read more
Source§

fn is_empty(&self) -> bool

Returns true if the queue has no items waiting to be processed.

Auto Trait Implementations§

§

impl<T> Freeze for BatchedQueue<T>

§

impl<T> !RefUnwindSafe for BatchedQueue<T>

§

impl<T> Send for BatchedQueue<T>
where T: Send,

§

impl<T> Sync for BatchedQueue<T>
where T: Send,

§

impl<T> Unpin for BatchedQueue<T>
where T: Unpin,

§

impl<T> !UnwindSafe for BatchedQueue<T>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.