batched_queue/lib.rs
1//! # `batched-queue`
2//!
3//! A high-performance, highly-concurrent batched queue implementation for Rust.
4//!
5//! `batched-queue` provides an efficient way to collect individual items into batches
6//! for processing, which can significantly improve throughput in high-volume systems.
7//!
8//! ## Features
9//!
10//! - **Batching**: Automatically collects items into batches of a configurable size
11//! - **Thread-safe**: Designed for concurrent usage with multiple producers and consumers
12//! - **Backpressure**: Optional bounded queue to control memory usage
13//! - **Flexible retrieval**: Blocking, non-blocking, and timeout-based batch retrieval
14//! - **Multiple implementations**: Synchronous (default) and asynchronous modes via feature flags
15//!
16//! ## Usage
17//!
18//! By default, the crate provides a synchronous implementation:
19//!
20//! ```rust
21//! use batched_queue::{BatchedQueue, BatchedQueueTrait};
22//!
23//! // Create a queue with batch size of 10
24//! let queue = BatchedQueue::new(10).expect("Failed to create queue");
25//!
26//! // Create a sender that can be shared across threads
27//! let sender = queue.create_sender();
28//!
29//! // Push items to the queue (in real usage, this would be in another thread)
30//! for i in 0..25 {
31//! sender.push(i).expect("Failed to push item");
32//! }
33//!
34//! // Flush any remaining items that haven't formed a complete batch
35//! sender.flush().expect("Failed to flush");
36//!
37//! // Process batches
38//! while let Some(batch) = queue.try_next_batch().expect("Failed to get batch") {
39//! println!("Processing batch of {} items", batch.len());
40//! for item in batch {
41//! // Process each item
42//! println!(" Item: {}", item);
43//! }
44//! }
45//! ```
46//!
47//! ## Feature Flags
48//!
49//! - [`sync`] (default): Enables the synchronous implementation using [`parking_lot`] and [`crossbeam_channel`]
50//! - `async`: Enables the asynchronous implementation using tokio
51
52use std::time::Duration;
53use thiserror::Error;
54
55// Export the sync BatchedQueue by default
56#[cfg(feature = "sync")]
57pub use sync::BatchedQueue;
58
59/// Error type for a batched queue.
60#[derive(Error, Debug, Clone)]
61pub enum BatchedQueueError {
62 #[error("Channel is full (backpressure limit reached)")]
63 ChannelFull,
64
65 #[error("Channel is disconnected (all receivers dropped)")]
66 Disconnected,
67
68 #[error("Operation timed out after {0:?}")]
69 Timeout(Duration),
70
71 #[error("Queue capacity exceeded: tried to add more than {max_capacity} items")]
72 CapacityExceeded { max_capacity: usize },
73
74 #[error("Invalid batch size: {0}")]
75 InvalidBatchSize(usize),
76
77 #[error("Failed to send batch: {0}")]
78 SendError(String),
79
80 #[error("Failed to receive batch: {0}")]
81 ReceiveError(String),
82}
83
84/// Contextual information about [`BatchedQueueError`].
85#[derive(Debug, Clone)]
86pub struct ErrorContext {
87 pub operation: String,
88 pub queue_info: String,
89}
90
91impl BatchedQueueError {
92 pub fn timeout(duration: Duration) -> Self {
93 BatchedQueueError::Timeout(duration)
94 }
95
96 pub fn capacity_exceeded(max_capacity: usize) -> Self {
97 BatchedQueueError::CapacityExceeded { max_capacity }
98 }
99}
100
101/// Defines the common interface for batched queue implementations.
102///
103/// This trait provides methods for adding items to a queue, retrieving
104/// batches of items, and checking queue status. All implementations must
105/// handle the buffering of items until they form complete batches, and
106/// provide mechanisms for flushing partial batches when needed.
107///
108/// # Examples
109///
110/// ```
111/// use batched_queue::{BatchedQueue, BatchedQueueTrait};
112///
113/// // Create a queue with batch size of 10
114/// let queue = BatchedQueue::new(10).expect("Failed to create queue");
115///
116/// // Create a sender that can be shared across threads
117/// let sender = queue.create_sender();
118///
119/// // Push items to the queue (in real usage, this would be in another thread)
120/// for i in 0..25 {
121/// sender.push(i).expect("Failed to push item");
122/// }
123///
124/// // Flush any remaining items that haven't formed a complete batch
125/// sender.flush().expect("Failed to flush");
126///
127/// // Process batches
128/// while let Some(batch) = queue.try_next_batch().expect("Failed to get batch") {
129/// println!("Processing batch of {} items", batch.len());
130/// for item in batch {
131/// // Process each item
132/// println!(" Item: {}", item);
133/// }
134/// }
135/// ```
136pub trait BatchedQueueTrait<T> {
137 /// Returns the current number of items in the queue.
138 fn len(&self) -> usize;
139
140 /// Returns the maximum number of items a batch can hold.
141 fn capacity(&self) -> usize;
142
143 /// Returns `true` if the queue has no items waiting to be processed.
144 fn is_empty(&self) -> bool;
145
146 /// Adds an item to the queue.
147 ///
148 /// If adding this item causes the current batch to reach the configured
149 /// batch size, the batch will be automatically sent for processing.
150 ///
151 /// # Arguments
152 ///
153 /// * `item` - The item to add to the queue
154 ///
155 /// # Errors
156 ///
157 /// Returns `BatchedQueueError::Disconnected` if the receiving end has been dropped,
158 /// or other implementation-specific errors.
159 fn push(&self, item: T) -> Result<(), BatchedQueueError>;
160
161 /// Attempts to retrieve the next batch of items without blocking.
162 ///
163 /// # Returns
164 ///
165 /// * `Ok(Some(batch))` - A batch of items is available
166 /// * `Ok(None)` - No batch is currently available
167 ///
168 /// # Errors
169 ///
170 /// Returns `BatchedQueueError::Disconnected` if the sending end has been dropped,
171 /// or other implementation-specific errors.
172 fn try_next_batch(&self) -> Result<Option<Vec<T>>, BatchedQueueError>;
173
174 /// Retrieves the next batch of items, blocking until one is available.
175 ///
176 /// # Errors
177 ///
178 /// Returns `BatchedQueueError::Disconnected` if the sending end has been dropped,
179 /// or other implementation-specific errors.
180 fn next_batch(&self) -> Result<Vec<T>, BatchedQueueError>;
181
182 /// Retrieves the next batch of items, blocking until one is available or until the timeout expires.
183 ///
184 /// # Arguments
185 ///
186 /// * `timeout` - Maximum time to wait for a batch to become available
187 ///
188 /// # Errors
189 ///
190 /// Returns:
191 /// * `BatchedQueueError::Timeout` if no batch becomes available within the timeout period
192 /// * `BatchedQueueError::Disconnected` if the sending end has been dropped
193 /// * Other implementation-specific errors
194 fn next_batch_timeout(&self, timeout: std::time::Duration)
195 -> Result<Vec<T>, BatchedQueueError>;
196
197 /// Flushes any pending items into a batch, even if the batch is not full.
198 ///
199 /// This is useful for ensuring that all items are processed, especially
200 /// during shutdown or when batches need to be processed on demand.
201 ///
202 /// # Errors
203 ///
204 /// Returns `BatchedQueueError::Disconnected` if the receiving end has been dropped,
205 /// or other implementation-specific errors.
206 fn flush(&self) -> Result<(), BatchedQueueError>;
207}
208
209#[cfg(feature = "sync")]
210pub mod sync {
211 //! Synchronous implementation of the batched queue.
212 //!
213 //! This module provides a thread-safe implementation using [`crossbeam_channel`]
214 //! for communication and [`parking_lot::Mutex`] for low-contention locking.
215 //! It is designed for high-throughput scenarios where items need to be
216 //! processed in batches.
217
218 use super::*;
219 use crossbeam_channel as channel;
220 use parking_lot::Mutex;
221 use std::sync::Arc;
222 use std::sync::atomic::{AtomicUsize, Ordering};
223
224 /// A thread-safe, high-performance queue that automatically batches items.
225 ///
226 /// [`BatchedQueue`] collects individual items until reaching the configured batch size,
227 /// then automatically makes the batch available for processing. This batching approach
228 /// can significantly improve throughput in high-volume systems by reducing overhead.
229 ///
230 /// # Examples
231 ///
232 /// ```
233 /// use batched_queue::{BatchedQueue, BatchedQueueTrait};
234 ///
235 /// use std::thread;
236 /// use std::time::Duration;
237 ///
238 /// // Create a queue with batch size of 5
239 /// let queue = BatchedQueue::new(5).expect("Failed to create queue");
240 ///
241 /// // Create a sender that can be shared across threads
242 /// let sender = queue.create_sender();
243 ///
244 /// // Producer thread
245 /// let producer = thread::spawn(move || {
246 /// for i in 0..20 {
247 /// sender.push(i).expect("Failed to push item");
248 /// thread::sleep(Duration::from_millis(10));
249 /// }
250 /// sender.flush().expect("Failed to flush"); // Send any remaining items
251 /// });
252 ///
253 /// // Consumer thread
254 /// let consumer = thread::spawn(move || {
255 /// let mut all_items = Vec::new();
256 ///
257 /// // Process batches as they become available
258 /// while all_items.len() < 20 {
259 /// if let Ok(batch) = queue.next_batch_timeout(Duration::from_millis(100)) {
260 /// all_items.extend(batch);
261 /// }
262 /// }
263 ///
264 /// all_items
265 /// });
266 ///
267 /// // Wait for threads to complete
268 /// producer.join().unwrap();
269 /// let result = consumer.join().unwrap();
270 ///
271 /// assert_eq!(result.len(), 20);
272 /// ```
273 pub struct BatchedQueue<T> {
274 batch_size: usize,
275 current_batch: Arc<Mutex<Vec<T>>>,
276 batch_receiver: channel::Receiver<Vec<T>>,
277 batch_sender: channel::Sender<Vec<T>>,
278 item_count: Arc<AtomicUsize>,
279 }
280
281 impl<T: Send + 'static> BatchedQueue<T> {
282 /// Creates a new batched queue with the specified batch size and an unbounded channel.
283 ///
284 /// # Arguments
285 ///
286 /// * `batch_size` - The number of items to collect before forming a batch
287 ///
288 /// # Examples
289 ///
290 /// ```
291 /// use batched_queue::BatchedQueue;
292 ///
293 /// let queue = BatchedQueue::<String>::new(10).expect("Failed to create queue");
294 /// ```
295 pub fn new(batch_size: usize) -> Result<Self, BatchedQueueError> {
296 if batch_size == 0 {
297 return Err(BatchedQueueError::InvalidBatchSize(batch_size));
298 }
299
300 let (batch_sender, batch_receiver) = channel::unbounded();
301 Ok(Self {
302 batch_size,
303 current_batch: Arc::new(Mutex::new(Vec::with_capacity(batch_size))),
304 batch_receiver,
305 batch_sender,
306 item_count: Arc::new(AtomicUsize::new(0)),
307 })
308 }
309
310 /// Creates a new batched queue with a bounded channel for backpressure.
311 ///
312 /// Using a bounded channel helps control memory usage by limiting the number
313 /// of batches that can be queued at once. When the channel is full, producers
314 /// will block when attempting to send a full batch.
315 ///
316 /// # Arguments
317 ///
318 /// * `batch_size` - The number of items to collect before forming a batch
319 /// * `max_batches` - The maximum number of batches that can be queued
320 ///
321 /// # Examples
322 ///
323 /// ```
324 /// use batched_queue::BatchedQueue;
325 ///
326 /// // Create a queue with batch size 10 and at most 5 batches in the channel
327 /// let queue = BatchedQueue::<i32>::new_bounded(10, 5).expect("Failed to create queue");
328 /// ```
329 pub fn new_bounded(
330 batch_size: usize,
331 max_batches: usize,
332 ) -> Result<Self, BatchedQueueError> {
333 if batch_size == 0 {
334 return Err(BatchedQueueError::InvalidBatchSize(batch_size));
335 }
336 if max_batches == 0 {
337 return Err(BatchedQueueError::InvalidBatchSize(max_batches));
338 }
339
340 let (batch_sender, batch_receiver) = channel::bounded(max_batches);
341 Ok(Self {
342 batch_size,
343 current_batch: Arc::new(Mutex::new(Vec::with_capacity(batch_size))),
344 batch_receiver,
345 batch_sender,
346 item_count: Arc::new(AtomicUsize::new(0)),
347 })
348 }
349
350 /// Creates a sender for this queue that can be shared across threads.
351 ///
352 /// Multiple senders can be created from a single queue, allowing
353 /// for concurrent producers.
354 ///
355 /// # Returns
356 ///
357 /// A new [`BatchedQueueSender`] linked to this queue
358 ///
359 /// # Examples
360 ///
361 /// ```
362 /// use batched_queue::BatchedQueue;
363 /// use std::thread;
364 ///
365 /// let queue = BatchedQueue::<i32>::new(10).expect("Failed to create queue");
366 ///
367 /// // Create multiple senders for different threads
368 /// let sender1 = queue.create_sender();
369 /// let sender2 = queue.create_sender();
370 ///
371 /// // Use senders in different threads
372 /// let t1 = thread::spawn(move || {
373 /// for i in 0..10 {
374 /// sender1.push(i).expect("Failed to push item");
375 /// }
376 /// });
377 ///
378 /// let t2 = thread::spawn(move || {
379 /// for i in 10..20 {
380 /// sender2.push(i).expect("Failed to push item");
381 /// }
382 /// });
383 ///
384 /// // Wait for producers to finish
385 /// t1.join().unwrap();
386 /// t2.join().unwrap();
387 /// ```
388 pub fn create_sender(&self) -> BatchedQueueSender<T> {
389 BatchedQueueSender {
390 batch_size: self.batch_size,
391 current_batch: self.current_batch.clone(),
392 batch_sender: self.batch_sender.clone(),
393 item_count: self.item_count.clone(),
394 }
395 }
396
397 /// Takes any items left in the current batch and returns them when shutting down.
398 ///
399 /// This method is useful during controlled shutdown to collect any remaining items
400 /// that haven't formed a complete batch.
401 ///
402 /// # Returns
403 ///
404 /// A vector containing any items that were in the current batch
405 ///
406 /// # Examples
407 ///
408 /// ```
409 /// use batched_queue::BatchedQueue;
410 ///
411 /// let queue = BatchedQueue::<i32>::new(10).expect("Failed to create queue");
412 /// let sender = queue.create_sender();
413 ///
414 /// // Add some items, but not enough to form a complete batch
415 /// for i in 0..3 {
416 /// sender.push(i).expect("Failed to push item");
417 /// }
418 ///
419 /// // Close the queue and get remaining items
420 /// let remaining = queue.close_queue();
421 /// assert_eq!(remaining.len(), 3);
422 /// ```
423 pub fn close_queue(&self) -> Vec<T> {
424 // Take any items left in the current batch
425 let mut batch = self.current_batch.lock();
426 std::mem::take(&mut *batch)
427 }
428 }
429
430 impl<T: Send + 'static> BatchedQueueTrait<T> for BatchedQueue<T> {
431 fn push(&self, item: T) -> Result<(), BatchedQueueError> {
432 let mut batch = self.current_batch.lock();
433 batch.push(item);
434
435 let count = self.item_count.fetch_add(1, Ordering::SeqCst);
436
437 if count % self.batch_size == self.batch_size - 1 {
438 let full_batch =
439 std::mem::replace(&mut *batch, Vec::with_capacity(self.batch_size));
440
441 self.batch_sender
442 .send(full_batch)
443 .map_err(|_| BatchedQueueError::Disconnected)?;
444 }
445
446 Ok(())
447 }
448
449 fn try_next_batch(&self) -> Result<Option<Vec<T>>, BatchedQueueError> {
450 match self.batch_receiver.try_recv() {
451 Ok(batch) => Ok(Some(batch)),
452 Err(channel::TryRecvError::Empty) => Ok(None),
453 Err(channel::TryRecvError::Disconnected) => Err(BatchedQueueError::Disconnected),
454 }
455 }
456
457 fn next_batch(&self) -> Result<Vec<T>, BatchedQueueError> {
458 self.batch_receiver
459 .recv()
460 .map_err(|_| BatchedQueueError::Disconnected)
461 }
462
463 fn next_batch_timeout(
464 &self,
465 timeout: std::time::Duration,
466 ) -> Result<Vec<T>, BatchedQueueError> {
467 match self.batch_receiver.recv_timeout(timeout) {
468 Ok(batch) => Ok(batch),
469 Err(channel::RecvTimeoutError::Timeout) => Err(BatchedQueueError::Timeout(timeout)),
470 Err(channel::RecvTimeoutError::Disconnected) => {
471 Err(BatchedQueueError::Disconnected)
472 }
473 }
474 }
475
476 fn len(&self) -> usize {
477 self.item_count.load(Ordering::SeqCst)
478 }
479
480 fn capacity(&self) -> usize {
481 self.batch_size
482 }
483
484 fn flush(&self) -> Result<(), BatchedQueueError> {
485 let mut batch = self.current_batch.lock();
486 if !batch.is_empty() {
487 let partial_batch =
488 std::mem::replace(&mut *batch, Vec::with_capacity(self.batch_size));
489 self.batch_sender
490 .send(partial_batch)
491 .map_err(|_| BatchedQueueError::Disconnected)?;
492 }
493 Ok(())
494 }
495
496 fn is_empty(&self) -> bool {
497 self.batch_receiver.is_empty() && self.current_batch.lock().is_empty()
498 }
499 }
500
501 /// A sender handle for adding items to a batched queue.
502 ///
503 /// `BatchedQueueSender` provides methods to add items to a batched queue from multiple
504 /// threads. It handles the details of batch management and automatic flushing of batches
505 /// when they reach the configured size.
506 ///
507 /// # Examples
508 ///
509 /// ```
510 /// use batched_queue::BatchedQueue;
511 /// use std::thread;
512 ///
513 /// let queue = BatchedQueue::<String>::new(5).expect("Failed to create queue");
514 /// let sender = queue.create_sender();
515 ///
516 /// // Share the sender with another thread
517 /// thread::spawn(move || {
518 /// for i in 0..10 {
519 /// sender.push(format!("Item {}", i)).expect("Failed to push item");
520 /// }
521 ///
522 /// // Ensure any remaining items are sent
523 /// sender.flush().expect("Failed to flush");
524 /// });
525 /// ```
526 pub struct BatchedQueueSender<T> {
527 batch_size: usize,
528 current_batch: Arc<Mutex<Vec<T>>>,
529 batch_sender: channel::Sender<Vec<T>>,
530 item_count: Arc<AtomicUsize>,
531 }
532
533 impl<T: Send + 'static> Clone for BatchedQueueSender<T> {
534 fn clone(&self) -> Self {
535 Self {
536 batch_size: self.batch_size,
537 current_batch: self.current_batch.clone(),
538 batch_sender: self.batch_sender.clone(),
539 item_count: self.item_count.clone(),
540 }
541 }
542 }
543
544 impl<T: Send + Clone + 'static> BatchedQueueSender<T> {
545 /// Adds an item to the queue.
546 ///
547 /// If adding this item causes the current batch to reach the configured
548 /// batch size, the batch will be automatically sent for processing.
549 /// This method will block if the channel is bounded and full.
550 ///
551 /// # Arguments
552 ///
553 /// * `item` - The item to add to the queue
554 ///
555 /// # Examples
556 ///
557 /// ```
558 /// use batched_queue::BatchedQueue;
559 ///
560 /// let queue = BatchedQueue::<i32>::new(5).expect("Failed to create queue");
561 /// let sender = queue.create_sender();
562 ///
563 /// for i in 0..10 {
564 /// sender.push(i).expect("Failed to push item");
565 /// }
566 /// ```
567 pub fn push(&self, item: T) -> Result<(), BatchedQueueError> {
568 let should_send_batch;
569 let mut full_batch = None;
570
571 {
572 let mut batch = self.current_batch.lock();
573 batch.push(item);
574
575 let count = self.item_count.fetch_add(1, Ordering::SeqCst);
576 should_send_batch = count % self.batch_size == self.batch_size - 1;
577
578 if should_send_batch {
579 // Take the batch but minimize time in the critical section
580 full_batch = Some(std::mem::replace(
581 &mut *batch,
582 Vec::with_capacity(self.batch_size),
583 ));
584 }
585 }
586
587 if let Some(batch) = full_batch {
588 self.batch_sender
589 .send(batch)
590 .map_err(|_| BatchedQueueError::Disconnected)?;
591 }
592
593 Ok(())
594 }
595
596 /// Attempts to add an item to the queue without blocking.
597 ///
598 /// This method is similar to `push`, but it will not block if the channel
599 /// is bounded and full. Instead, if a full batch cannot be sent because
600 /// the channel is full, the batch is kept in the current batch and will
601 /// be sent on a future push or flush operation.
602 ///
603 /// # Arguments
604 ///
605 /// * `item` - The item to add to the queue
606 ///
607 /// # Examples
608 ///
609 /// ```
610 /// use batched_queue::BatchedQueue;
611 ///
612 /// // Create a queue with limited capacity
613 /// let queue = BatchedQueue::<i32>::new_bounded(5, 1).expect("Failed to create queue");
614 /// let sender = queue.create_sender();
615 ///
616 /// for i in 0..20 {
617 /// sender.try_push(i);
618 /// }
619 /// ```
620 pub fn try_push(&self, item: T) -> Result<(), BatchedQueueError> {
621 let should_send_batch;
622 let mut full_batch = None;
623
624 {
625 let mut batch = self.current_batch.lock();
626 batch.push(item);
627
628 let count = self.item_count.fetch_add(1, Ordering::SeqCst);
629 should_send_batch = count % self.batch_size == self.batch_size - 1;
630
631 if should_send_batch {
632 // Take the batch but minimize time in the critical section
633 full_batch = Some(std::mem::replace(
634 &mut *batch,
635 Vec::with_capacity(self.batch_size),
636 ));
637 }
638 }
639
640 if let Some(batch_to_send) = full_batch {
641 match self.batch_sender.try_send(batch_to_send.clone()) {
642 Ok(_) => {}
643 Err(channel::TrySendError::Full(_)) => {
644 // If channel is full, we need to restore the batch
645 {
646 let mut batch = self.current_batch.lock();
647 // This assumes the batch is empty, which it should be after the replace above
648 *batch = batch_to_send;
649 }
650 // We didn't actually send a batch, so decrement the count
651 self.item_count.fetch_sub(1, Ordering::SeqCst);
652 return Err(BatchedQueueError::ChannelFull);
653 }
654 Err(channel::TrySendError::Disconnected(_)) => {
655 return Err(BatchedQueueError::Disconnected);
656 }
657 }
658 }
659
660 Ok(())
661 }
662
663 /// Flushes any pending items into a batch, even if the batch is not full.
664 ///
665 /// This method will block if the channel is bounded and full.
666 ///
667 /// # Error
668 ///
669 /// Returns `BatchedQueueError::Disconnected` if the receiving end has been dropped,
670 ///
671 /// # Examples
672 ///
673 /// ```
674 /// use batched_queue::BatchedQueue;
675 ///
676 /// let queue = BatchedQueue::<i32>::new(10).expect("Failed to create queue");
677 /// let sender = queue.create_sender();
678 ///
679 /// // Add some items, but not enough to form a complete batch
680 /// for i in 0..3 {
681 /// sender.push(i).expect("Failed to push item");
682 /// }
683 ///
684 /// // Flush to ensure items are sent for processing
685 /// sender.flush().expect("Failed to flush");
686 /// ```
687 pub fn flush(&self) -> Result<(), BatchedQueueError> {
688 let mut batch = self.current_batch.lock();
689 if !batch.is_empty() {
690 let partial_batch =
691 std::mem::replace(&mut *batch, Vec::with_capacity(self.batch_size));
692 self.batch_sender
693 .send(partial_batch)
694 .map_err(|_| BatchedQueueError::Disconnected)?;
695 }
696 Ok(())
697 }
698
699 /// Attempts to flush any pending items without blocking.
700 ///
701 /// # Errors
702 ///
703 /// Returns `BatchedQueueError::Disconnected` if the receiving end has been dropped,
704 /// or `BatchedQueueError::ChannelFull` if the channel is full.
705 ///
706 /// # Examples
707 ///
708 /// ```
709 /// use batched_queue::BatchedQueue;
710 ///
711 /// // Create a queue with limited capacity
712 /// let queue = BatchedQueue::<i32>::new_bounded(5, 1).expect("Failed to create queue");
713 /// let sender = queue.create_sender();
714 ///
715 /// for i in 0..3 {
716 /// sender.push(i).expect("Failed to push item");
717 /// }
718 ///
719 /// // Try to flush without blocking
720 /// if !sender.try_flush().is_ok() {
721 /// println!("Channel is full, will try again later");
722 /// }
723 /// ```
724 pub fn try_flush(&self) -> Result<(), BatchedQueueError> {
725 let mut batch = self.current_batch.lock();
726 if !batch.is_empty() {
727 let partial_batch =
728 std::mem::replace(&mut *batch, Vec::with_capacity(self.batch_size));
729 match self.batch_sender.try_send(partial_batch) {
730 Ok(_) => Ok(()),
731 Err(channel::TrySendError::Full(_)) => Err(BatchedQueueError::ChannelFull),
732 Err(channel::TrySendError::Disconnected(_)) => {
733 Err(BatchedQueueError::Disconnected)
734 }
735 }
736 } else {
737 Ok(())
738 }
739 }
740 }
741
742 // For testing thread-safe behavior
743 #[cfg(test)]
744 mod tests {
745 use super::*;
746 use std::thread;
747 use std::time::Duration;
748
749 #[test]
750 fn multithreaded() {
751 let queue = BatchedQueue::<i32>::new(10).expect("Failed to create queue");
752 let sender1 = queue.create_sender();
753 let sender2 = queue.create_sender();
754
755 // Thread 1: Push numbers 0-49
756 let t1 = thread::spawn(move || {
757 for i in 0..50 {
758 sender1.push(i).expect("Failed to push item");
759 thread::sleep(Duration::from_millis(1));
760 }
761 sender1.flush().expect("Failed to flush");
762 });
763
764 // Thread 2: Push numbers 100-149
765 let t2 = thread::spawn(move || {
766 for i in 100..150 {
767 sender2.push(i).expect("Failed to push item");
768 thread::sleep(Duration::from_millis(1));
769 }
770 sender2.flush().expect("Failed to flush");
771 });
772
773 // Consumer thread: Collect all batches
774 let t3 = thread::spawn(move || {
775 let mut all_items = Vec::new();
776
777 // Collect for a reasonable amount of time
778 for _ in 0..15 {
779 if let Some(batch) = queue.try_next_batch().expect("Failed to get batch") {
780 all_items.extend(batch);
781 }
782 thread::sleep(Duration::from_millis(10));
783 }
784
785 // Make sure we got all remaining batches
786 while let Some(batch) = queue.try_next_batch().expect("Failed to get batch") {
787 all_items.extend(batch);
788 }
789
790 all_items
791 });
792
793 // Wait for producer threads to finish
794 t1.join().unwrap();
795 t2.join().unwrap();
796
797 // Get results from consumer
798 let result = t3.join().unwrap();
799
800 // Verify we have all 100 items
801 assert_eq!(result.len(), 100);
802
803 // Check that we have all numbers
804 let mut result_sorted = result.clone();
805 result_sorted.sort();
806
807 // Verify we got all numbers 0-49 and 100-149
808 for i in 0..50 {
809 assert!(result_sorted.contains(&i));
810 assert!(result_sorted.contains(&(i + 100)));
811 }
812 }
813
814 #[test]
815 fn timeout() {
816 let queue = BatchedQueue::<i32>::new(5).expect("Failed to create queue");
817 let sender = queue.create_sender();
818
819 // Add 3 items (not enough to trigger a batch)
820 for i in 1..4 {
821 sender.push(i).unwrap();
822 }
823
824 // Try to get a batch with a short timeout - should time out
825 let result = queue.next_batch_timeout(Duration::from_millis(10));
826 assert!(result.is_err());
827
828 // Now flush the incomplete batch
829 sender.flush().unwrap();
830
831 // Should get the batch now
832 let batch = queue.next_batch_timeout(Duration::from_millis(10)).unwrap();
833 assert_eq!(batch, vec![1, 2, 3]);
834 }
835
836 #[test]
837 fn bounded_channel() {
838 // Create a bounded queue with batch size 5 and max 2 batches in the channel
839 let queue = BatchedQueue::new_bounded(5, 2).expect("Failed to create queue");
840 let sender = queue.create_sender();
841
842 // Producer thread
843 let handle = thread::spawn(move || {
844 let mut successful_pushes = 0;
845 // Try to push 20 items
846 for item_idx in 0..20 {
847 // Use push which will block if the channel is full
848 sender.push(item_idx).expect("Failed to push item");
849 successful_pushes += 1;
850
851 // Add a small delay
852 if item_idx % 5 == 4 {
853 // Every 5th item, wait a bit longer
854 thread::sleep(Duration::from_millis(5));
855 }
856 }
857 sender.flush().expect("Failed to flush");
858 successful_pushes
859 });
860
861 // Consumer thread - retrieve batches to prevent deadlock
862 let mut received_batches = 0;
863 let mut all_items = Vec::new();
864
865 // Receive batches while the producer is running
866 while received_batches < 4 {
867 // Expect 4 full batches of 5 items each
868 if let Some(batch) = queue.try_next_batch().expect("Failed to get batch") {
869 received_batches += 1;
870 all_items.extend(batch);
871 }
872 thread::sleep(Duration::from_millis(5));
873 }
874
875 // Wait for producer to finish
876 let successful_pushes = handle.join().unwrap();
877
878 // Receive any remaining batches
879 while let Some(batch) = queue.try_next_batch().expect("Failed to get batch") {
880 all_items.extend(batch);
881 }
882
883 // Should have all 20 items
884 assert_eq!(all_items.len(), 20);
885 assert_eq!(successful_pushes, 20);
886
887 // Verify we have all numbers 0-19
888 let mut sorted_items = all_items.clone();
889 sorted_items.sort();
890 for i in 0..20 {
891 assert!(sorted_items.contains(&i));
892 }
893 }
894
895 #[test]
896 fn backpressure() {
897 // Create a bounded queue with backpressure
898 let queue = BatchedQueue::new_bounded(5, 1).expect("Failed to create queue"); // Only 1 batch in the channel
899 let sender = queue.create_sender();
900
901 // Fill the first batch and send it
902 for i in 0..5 {
903 sender.push(i).expect("Failed to push item");
904 }
905 // Now the batch is automatically sent because it's full
906
907 // Create a partial second batch
908 for i in 5..8 {
909 sender.push(i).expect("Failed to push item");
910 }
911
912 // At this point, we have one full batch in the channel and a partial batch in current_batch
913
914 // Get the first batch to make room in the channel
915 let batch = queue.next_batch().expect("Failed to get batch");
916 assert_eq!(batch, vec![0, 1, 2, 3, 4]);
917
918 // Now flush the partial batch - this should succeed
919 assert!(sender.try_flush().is_ok());
920
921 // And we should get the second batch
922 let batch = queue
923 .next_batch_timeout(Duration::from_millis(50))
924 .expect("Failed to get batch");
925 assert_eq!(batch, vec![5, 6, 7]);
926 }
927
928 #[test]
929 fn error_handling() {
930 // Test invalid batch size
931 let invalid_queue = BatchedQueue::<i32>::new(0);
932 assert!(matches!(
933 invalid_queue,
934 Err(BatchedQueueError::InvalidBatchSize(0))
935 ));
936
937 // Create a queue with very limited capacity - only 1 batch in the channel
938 let limited_queue = BatchedQueue::new_bounded(5, 1).expect("Failed to create queue");
939 let limited_sender = limited_queue.create_sender();
940
941 // Fill the channel with one complete batch
942 for i in 0..5 {
943 limited_sender
944 .push(i)
945 .expect("Should succeed for first batch");
946 }
947
948 // At this point, we have one full batch in the channel
949 // Now, add items to start building a second batch
950 for i in 5..9 {
951 limited_sender
952 .push(i)
953 .expect("Should succeed as we're building a partial batch");
954 }
955
956 // Try to complete the second batch, which should fail with ChannelFull
957 // because when it completes, it immediately tries to send it
958 let result = limited_sender.try_push(9);
959 assert!(matches!(result, Err(BatchedQueueError::ChannelFull)));
960
961 // Now let's test the timeout
962 // First ensure there's nothing ready in the queue by consuming the batch
963 limited_queue
964 .next_batch()
965 .expect("Should get the first batch");
966
967 // Now we should have nothing in the channel and only a partial batch
968 // Try to get a batch with a very short timeout
969 let result = limited_queue.next_batch_timeout(Duration::from_millis(1));
970 assert!(matches!(result, Err(BatchedQueueError::Timeout(_))));
971 }
972
973 #[cfg(test)]
974 mod stress_tests {
975 use super::*;
976 use std::collections::HashSet;
977 use std::sync::atomic::{AtomicUsize, Ordering};
978 use std::sync::{Arc, Barrier};
979 use std::thread;
980 use std::time::{Duration, Instant};
981
982 #[test]
983 fn batched_queue() {
984 // Configuration parameters
985 const BATCH_SIZE: usize = 100;
986 const CHANNEL_CAPACITY: usize = 10;
987 const PRODUCER_COUNT: usize = 64;
988 const ITEMS_PER_PRODUCER: usize = 10_000;
989 const CONSUMER_COUNT: usize = 4;
990
991 // Wrap the queue in an Arc so it can be safely shared between threads
992 let queue = Arc::new(
993 BatchedQueue::new_bounded(BATCH_SIZE, CHANNEL_CAPACITY)
994 .expect("Failed to create queue"),
995 );
996
997 // Setup synchronization primitives
998 let start_barrier = Arc::new(Barrier::new(PRODUCER_COUNT + CONSUMER_COUNT + 1));
999 let total_expected_items = PRODUCER_COUNT * ITEMS_PER_PRODUCER;
1000 let processed_items = Arc::new(AtomicUsize::new(0));
1001
1002 // Tracking data structures for verification
1003 let all_produced_items = Arc::new(parking_lot::Mutex::new(HashSet::new()));
1004 let all_consumed_items = Arc::new(parking_lot::Mutex::new(HashSet::new()));
1005
1006 // Track performance metrics
1007 let producer_times = Arc::new(parking_lot::Mutex::new(Vec::new()));
1008 let consumer_times = Arc::new(parking_lot::Mutex::new(Vec::new()));
1009
1010 // Create and launch producer threads
1011 let producer_handles: Vec<_> = (0..PRODUCER_COUNT)
1012 .map(|producer_id| {
1013 let queue_sender = queue.create_sender();
1014 let start = start_barrier.clone();
1015 let produced = all_produced_items.clone();
1016 let producer_timing = producer_times.clone();
1017
1018 thread::spawn(move || {
1019 // Wait for all threads to be ready
1020 start.wait();
1021 let start_time = Instant::now();
1022
1023 // Producer offset ensures each producer generates unique values
1024 let offset = producer_id * ITEMS_PER_PRODUCER;
1025
1026 // Track items we produced in this thread
1027 let mut local_produced = HashSet::new();
1028
1029 for i in 0..ITEMS_PER_PRODUCER {
1030 let item = offset + i;
1031 queue_sender.push(item).expect("Failed to push item");
1032 local_produced.insert(item);
1033
1034 // Occasionally sleep to create more contention patterns
1035 if i % 1000 == 0 {
1036 thread::sleep(Duration::from_micros(10));
1037 }
1038 }
1039
1040 // Ensure final batch is sent
1041 queue_sender.flush().expect("Failed to flush");
1042
1043 // Record items this producer generated
1044 let mut global_produced = produced.lock();
1045 for item in local_produced {
1046 global_produced.insert(item);
1047 }
1048
1049 let elapsed = start_time.elapsed();
1050 producer_timing.lock().push(elapsed);
1051
1052 println!("Producer {}: Finished in {:?}", producer_id, elapsed);
1053 })
1054 })
1055 .collect();
1056
1057 // Create and launch consumer threads
1058 let consumer_handles: Vec<_> = (0..CONSUMER_COUNT)
1059 .map(|consumer_id| {
1060 let queue = queue.clone(); // Clone the Arc, not the queue itself
1061 let start = start_barrier.clone();
1062 let processed = processed_items.clone();
1063 let consumed = all_consumed_items.clone();
1064 let consumer_timing = consumer_times.clone();
1065
1066 thread::spawn(move || {
1067 // Wait for all threads to be ready
1068 start.wait();
1069 let start_time = Instant::now();
1070
1071 // Track items consumed by this thread
1072 let mut local_consumed = HashSet::new();
1073 let mut batches_processed = 0;
1074
1075 loop {
1076 // Try to get a batch with timeout
1077 if let Ok(batch) =
1078 queue.next_batch_timeout(Duration::from_millis(100))
1079 {
1080 batches_processed += 1;
1081 let batch_size = batch.len();
1082
1083 // Process each item in the batch
1084 for item in batch {
1085 local_consumed.insert(item);
1086 }
1087
1088 // Update total processed count
1089 let current = processed.fetch_add(batch_size, Ordering::SeqCst);
1090
1091 // Check if we've processed all expected items
1092 if current + batch_size >= total_expected_items {
1093 break;
1094 }
1095 } else if processed.load(Ordering::SeqCst) >= total_expected_items {
1096 // No more batches and we've processed all expected items
1097 break;
1098 }
1099
1100 // Occasionally check if we're done to avoid waiting for full timeout
1101 if processed.load(Ordering::SeqCst) >= total_expected_items {
1102 break;
1103 }
1104 }
1105
1106 // Record the items this consumer processed
1107 let mut global_consumed = consumed.lock();
1108 for item in local_consumed {
1109 global_consumed.insert(item);
1110 }
1111
1112 let elapsed = start_time.elapsed();
1113 consumer_timing.lock().push(elapsed);
1114
1115 println!(
1116 "Consumer {}: Processed {} batches in {:?}",
1117 consumer_id, batches_processed, elapsed
1118 );
1119 })
1120 })
1121 .collect();
1122
1123 // Start the test
1124 println!(
1125 "Starting stress test with {} producers and {} consumers",
1126 PRODUCER_COUNT, CONSUMER_COUNT
1127 );
1128 println!(
1129 "Each producer will generate {} items with batch size {}",
1130 ITEMS_PER_PRODUCER, BATCH_SIZE
1131 );
1132
1133 let overall_start = Instant::now();
1134 start_barrier.wait();
1135
1136 // Wait for all producers to finish
1137 for handle in producer_handles {
1138 handle.join().unwrap();
1139 }
1140
1141 println!("All producers finished");
1142
1143 // Wait for all consumers to finish
1144 for handle in consumer_handles {
1145 handle.join().unwrap();
1146 }
1147
1148 let overall_elapsed = overall_start.elapsed();
1149 println!("All consumers finished");
1150 println!("Overall test time: {:?}", overall_elapsed);
1151
1152 // Verify results
1153 let produced = all_produced_items.lock();
1154 let consumed = all_consumed_items.lock();
1155
1156 println!("Items produced: {}", produced.len());
1157 println!("Items consumed: {}", consumed.len());
1158
1159 // Verify all produced items were consumed
1160 assert_eq!(
1161 produced.len(),
1162 total_expected_items,
1163 "Number of produced items doesn't match expected"
1164 );
1165 assert_eq!(
1166 consumed.len(),
1167 total_expected_items,
1168 "Number of consumed items doesn't match expected"
1169 );
1170
1171 for item in produced.iter() {
1172 assert!(
1173 consumed.contains(item),
1174 "Item {} was produced but not consumed",
1175 item
1176 );
1177 }
1178
1179 // Calculate performance metrics
1180 let producer_times = producer_times.lock();
1181 let consumer_times = consumer_times.lock();
1182
1183 let avg_producer_time = producer_times.iter().map(|d| d.as_millis()).sum::<u128>()
1184 / producer_times.len() as u128;
1185
1186 let avg_consumer_time = consumer_times.iter().map(|d| d.as_millis()).sum::<u128>()
1187 / consumer_times.len() as u128;
1188
1189 let throughput =
1190 total_expected_items as f64 / (overall_elapsed.as_millis() as f64 / 1000.0);
1191
1192 println!("Average producer time: {}ms", avg_producer_time);
1193 println!("Average consumer time: {}ms", avg_consumer_time);
1194 println!("Throughput: {:.2} items/second", throughput);
1195 }
1196 }
1197 }
1198}