pub struct BatchedQueue<T> { /* private fields */ }Expand description
A thread-safe, high-performance queue that automatically batches items.
BatchedQueue collects individual items until reaching the configured batch size,
then automatically makes the batch available for processing. This batching approach
can significantly improve throughput in high-volume systems by reducing overhead.
§Examples
use batched_queue::{BatchedQueue, BatchedQueueTrait};
use std::thread;
use std::time::Duration;
// Create a queue with batch size of 5
let queue = BatchedQueue::new(5).expect("Failed to create queue");
// Create a sender that can be shared across threads
let sender = queue.create_sender();
// Producer thread
let producer = thread::spawn(move || {
for i in 0..20 {
sender.push(i).expect("Failed to push item");
thread::sleep(Duration::from_millis(10));
}
sender.flush().expect("Failed to flush"); // Send any remaining items
});
// Consumer thread
let consumer = thread::spawn(move || {
let mut all_items = Vec::new();
// Process batches as they become available
while all_items.len() < 20 {
if let Ok(batch) = queue.next_batch_timeout(Duration::from_millis(100)) {
all_items.extend(batch);
}
}
all_items
});
// Wait for threads to complete
producer.join().unwrap();
let result = consumer.join().unwrap();
assert_eq!(result.len(), 20);Implementations§
Source§impl<T: Send + 'static> BatchedQueue<T>
impl<T: Send + 'static> BatchedQueue<T>
Sourcepub fn new(batch_size: usize) -> Result<Self, BatchedQueueError>
pub fn new(batch_size: usize) -> Result<Self, BatchedQueueError>
Sourcepub fn new_bounded(
batch_size: usize,
max_batches: usize,
) -> Result<Self, BatchedQueueError>
pub fn new_bounded( batch_size: usize, max_batches: usize, ) -> Result<Self, BatchedQueueError>
Creates a new batched queue with a bounded channel for backpressure.
Using a bounded channel helps control memory usage by limiting the number of batches that can be queued at once. When the channel is full, producers will block when attempting to send a full batch.
§Arguments
batch_size- The number of items to collect before forming a batchmax_batches- The maximum number of batches that can be queued
§Examples
use batched_queue::BatchedQueue;
// Create a queue with batch size 10 and at most 5 batches in the channel
let queue = BatchedQueue::<i32>::new_bounded(10, 5).expect("Failed to create queue");Sourcepub fn create_sender(&self) -> BatchedQueueSender<T>
pub fn create_sender(&self) -> BatchedQueueSender<T>
Creates a sender for this queue that can be shared across threads.
Multiple senders can be created from a single queue, allowing for concurrent producers.
§Returns
A new BatchedQueueSender linked to this queue
§Examples
use batched_queue::BatchedQueue;
use std::thread;
let queue = BatchedQueue::<i32>::new(10).expect("Failed to create queue");
// Create multiple senders for different threads
let sender1 = queue.create_sender();
let sender2 = queue.create_sender();
// Use senders in different threads
let t1 = thread::spawn(move || {
for i in 0..10 {
sender1.push(i).expect("Failed to push item");
}
});
let t2 = thread::spawn(move || {
for i in 10..20 {
sender2.push(i).expect("Failed to push item");
}
});
// Wait for producers to finish
t1.join().unwrap();
t2.join().unwrap();Sourcepub fn close_queue(&self) -> Vec<T>
pub fn close_queue(&self) -> Vec<T>
Takes any items left in the current batch and returns them when shutting down.
This method is useful during controlled shutdown to collect any remaining items that haven’t formed a complete batch.
§Returns
A vector containing any items that were in the current batch
§Examples
use batched_queue::BatchedQueue;
let queue = BatchedQueue::<i32>::new(10).expect("Failed to create queue");
let sender = queue.create_sender();
// Add some items, but not enough to form a complete batch
for i in 0..3 {
sender.push(i).expect("Failed to push item");
}
// Close the queue and get remaining items
let remaining = queue.close_queue();
assert_eq!(remaining.len(), 3);