batched_queue/
lib.rs

1//! # `batched-queue`
2//!
3//! A high-performance, highly-concurrent batched queue implementation for Rust.
4//!
5//! `batched-queue` provides an efficient way to collect individual items into batches
6//! for processing, which can significantly improve throughput in high-volume systems.
7//!
8//! ## Features
9//!
10//! - **Batching**: Automatically collects items into batches of a configurable size
11//! - **Thread-safe**: Designed for concurrent usage with multiple producers and consumers
12//! - **Backpressure**: Optional bounded queue to control memory usage
13//! - **Flexible retrieval**: Blocking, non-blocking, and timeout-based batch retrieval
14//! - **Multiple implementations**: Synchronous (default) and asynchronous modes via feature flags
15//!
16//! ## Usage
17//!
18//! By default, the crate provides a synchronous implementation:
19//!
20//! ```rust
21//! use batched_queue::{BatchedQueue, BatchedQueueTrait};
22//!
23//! // Create a queue with batch size of 10
24//! let queue = BatchedQueue::new(10).expect("Failed to create queue");
25//!
26//! // Create a sender that can be shared across threads
27//! let sender = queue.create_sender();
28//!
29//! // Push items to the queue (in real usage, this would be in another thread)
30//! for i in 0..25 {
31//!     sender.push(i).expect("Failed to push item");
32//! }
33//!
34//! // Flush any remaining items that haven't formed a complete batch
35//! sender.flush().expect("Failed to flush");
36//!
37//! // Process batches
38//! while let Some(batch) = queue.try_next_batch().expect("Failed to get batch") {
39//!     println!("Processing batch of {} items", batch.len());
40//!     for item in batch {
41//!         // Process each item
42//!         println!("  Item: {}", item);
43//!     }
44//! }
45//! ```
46//!
47//! ## Feature Flags
48//!
49//! - [`sync`] (default): Enables the synchronous implementation using [`parking_lot`] and [`crossbeam_channel`]
50//! - `async`: Enables the asynchronous implementation using tokio
51
52use std::time::Duration;
53use thiserror::Error;
54
55// Export the sync BatchedQueue by default
56#[cfg(feature = "sync")]
57pub use sync::BatchedQueue;
58
59/// Error type for a batched queue.
60#[derive(Error, Debug, Clone)]
61pub enum BatchedQueueError {
62    #[error("Channel is full (backpressure limit reached)")]
63    ChannelFull,
64
65    #[error("Channel is disconnected (all receivers dropped)")]
66    Disconnected,
67
68    #[error("Operation timed out after {0:?}")]
69    Timeout(Duration),
70
71    #[error("Queue capacity exceeded: tried to add more than {max_capacity} items")]
72    CapacityExceeded { max_capacity: usize },
73
74    #[error("Invalid batch size: {0}")]
75    InvalidBatchSize(usize),
76
77    #[error("Failed to send batch: {0}")]
78    SendError(String),
79
80    #[error("Failed to receive batch: {0}")]
81    ReceiveError(String),
82}
83
84/// Contextual information about [`BatchedQueueError`].
85#[derive(Debug, Clone)]
86pub struct ErrorContext {
87    pub operation: String,
88    pub queue_info: String,
89}
90
91impl BatchedQueueError {
92    pub fn timeout(duration: Duration) -> Self {
93        BatchedQueueError::Timeout(duration)
94    }
95
96    pub fn capacity_exceeded(max_capacity: usize) -> Self {
97        BatchedQueueError::CapacityExceeded { max_capacity }
98    }
99}
100
101/// Defines the common interface for batched queue implementations.
102///
103/// This trait provides methods for adding items to a queue, retrieving
104/// batches of items, and checking queue status. All implementations must
105/// handle the buffering of items until they form complete batches, and
106/// provide mechanisms for flushing partial batches when needed.
107///
108/// # Examples
109///
110/// ```
111/// use batched_queue::{BatchedQueue, BatchedQueueTrait};
112///
113/// // Create a queue with batch size of 10
114/// let queue = BatchedQueue::new(10).expect("Failed to create queue");
115///
116/// // Create a sender that can be shared across threads
117/// let sender = queue.create_sender();
118///
119/// // Push items to the queue (in real usage, this would be in another thread)
120/// for i in 0..25 {
121///     sender.push(i).expect("Failed to push item");
122/// }
123///
124/// // Flush any remaining items that haven't formed a complete batch
125/// sender.flush().expect("Failed to flush");
126///
127/// // Process batches
128/// while let Some(batch) = queue.try_next_batch().expect("Failed to get batch") {
129///     println!("Processing batch of {} items", batch.len());
130///     for item in batch {
131///         // Process each item
132///         println!("  Item: {}", item);
133///     }
134/// }
135/// ```
136pub trait BatchedQueueTrait<T> {
137    /// Returns the current number of items in the queue.
138    fn len(&self) -> usize;
139
140    /// Returns the maximum number of items a batch can hold.
141    fn capacity(&self) -> usize;
142
143    /// Returns `true` if the queue has no items waiting to be processed.
144    fn is_empty(&self) -> bool;
145
146    /// Adds an item to the queue.
147    ///
148    /// If adding this item causes the current batch to reach the configured
149    /// batch size, the batch will be automatically sent for processing.
150    ///
151    /// # Arguments
152    ///
153    /// * `item` - The item to add to the queue
154    ///
155    /// # Errors
156    ///
157    /// Returns `BatchedQueueError::Disconnected` if the receiving end has been dropped,
158    /// or other implementation-specific errors.
159    fn push(&self, item: T) -> Result<(), BatchedQueueError>;
160
161    /// Attempts to retrieve the next batch of items without blocking.
162    ///
163    /// # Returns
164    ///
165    /// * `Ok(Some(batch))` - A batch of items is available
166    /// * `Ok(None)` - No batch is currently available
167    ///
168    /// # Errors
169    ///
170    /// Returns `BatchedQueueError::Disconnected` if the sending end has been dropped,
171    /// or other implementation-specific errors.
172    fn try_next_batch(&self) -> Result<Option<Vec<T>>, BatchedQueueError>;
173
174    /// Retrieves the next batch of items, blocking until one is available.
175    ///
176    /// # Errors
177    ///
178    /// Returns `BatchedQueueError::Disconnected` if the sending end has been dropped,
179    /// or other implementation-specific errors.
180    fn next_batch(&self) -> Result<Vec<T>, BatchedQueueError>;
181
182    /// Retrieves the next batch of items, blocking until one is available or until the timeout expires.
183    ///
184    /// # Arguments
185    ///
186    /// * `timeout` - Maximum time to wait for a batch to become available
187    ///
188    /// # Errors
189    ///
190    /// Returns:
191    /// * `BatchedQueueError::Timeout` if no batch becomes available within the timeout period
192    /// * `BatchedQueueError::Disconnected` if the sending end has been dropped
193    /// * Other implementation-specific errors
194    fn next_batch_timeout(&self, timeout: std::time::Duration)
195    -> Result<Vec<T>, BatchedQueueError>;
196
197    /// Flushes any pending items into a batch, even if the batch is not full.
198    ///
199    /// This is useful for ensuring that all items are processed, especially
200    /// during shutdown or when batches need to be processed on demand.
201    ///
202    /// # Errors
203    ///
204    /// Returns `BatchedQueueError::Disconnected` if the receiving end has been dropped,
205    /// or other implementation-specific errors.
206    fn flush(&self) -> Result<(), BatchedQueueError>;
207}
208
209#[cfg(feature = "sync")]
210pub mod sync {
211    //! Synchronous implementation of the batched queue.
212    //!
213    //! This module provides a thread-safe implementation using [`crossbeam_channel`]
214    //! for communication and [`parking_lot::Mutex`] for low-contention locking.
215    //! It is designed for high-throughput scenarios where items need to be
216    //! processed in batches.
217
218    use super::*;
219    use crossbeam_channel as channel;
220    use parking_lot::Mutex;
221    use std::sync::Arc;
222    use std::sync::atomic::{AtomicUsize, Ordering};
223
224    /// A thread-safe, high-performance queue that automatically batches items.
225    ///
226    /// [`BatchedQueue`] collects individual items until reaching the configured batch size,
227    /// then automatically makes the batch available for processing. This batching approach
228    /// can significantly improve throughput in high-volume systems by reducing overhead.
229    ///
230    /// # Examples
231    ///
232    /// ```
233    /// use batched_queue::{BatchedQueue, BatchedQueueTrait};
234    ///
235    /// use std::thread;
236    /// use std::time::Duration;
237    ///
238    /// // Create a queue with batch size of 5
239    /// let queue = BatchedQueue::new(5).expect("Failed to create queue");
240    ///
241    /// // Create a sender that can be shared across threads
242    /// let sender = queue.create_sender();
243    ///
244    /// // Producer thread
245    /// let producer = thread::spawn(move || {
246    ///     for i in 0..20 {
247    ///         sender.push(i).expect("Failed to push item");
248    ///         thread::sleep(Duration::from_millis(10));
249    ///     }
250    ///     sender.flush().expect("Failed to flush"); // Send any remaining items
251    /// });
252    ///
253    /// // Consumer thread
254    /// let consumer = thread::spawn(move || {
255    ///     let mut all_items = Vec::new();
256    ///     
257    ///     // Process batches as they become available
258    ///     while all_items.len() < 20 {
259    ///         if let Ok(batch) = queue.next_batch_timeout(Duration::from_millis(100)) {
260    ///             all_items.extend(batch);
261    ///         }
262    ///     }
263    ///     
264    ///     all_items
265    /// });
266    ///
267    /// // Wait for threads to complete
268    /// producer.join().unwrap();
269    /// let result = consumer.join().unwrap();
270    ///
271    /// assert_eq!(result.len(), 20);
272    /// ```
273    pub struct BatchedQueue<T> {
274        batch_size: usize,
275        current_batch: Arc<Mutex<Vec<T>>>,
276        batch_receiver: channel::Receiver<Vec<T>>,
277        batch_sender: channel::Sender<Vec<T>>,
278        item_count: Arc<AtomicUsize>,
279    }
280
281    impl<T: Send + 'static> BatchedQueue<T> {
282        /// Creates a new batched queue with the specified batch size and an unbounded channel.
283        ///
284        /// # Arguments
285        ///
286        /// * `batch_size` - The number of items to collect before forming a batch
287        ///
288        /// # Examples
289        ///
290        /// ```
291        /// use batched_queue::BatchedQueue;
292        ///
293        /// let queue = BatchedQueue::<String>::new(10).expect("Failed to create queue");
294        /// ```
295        pub fn new(batch_size: usize) -> Result<Self, BatchedQueueError> {
296            if batch_size == 0 {
297                return Err(BatchedQueueError::InvalidBatchSize(batch_size));
298            }
299
300            let (batch_sender, batch_receiver) = channel::unbounded();
301            Ok(Self {
302                batch_size,
303                current_batch: Arc::new(Mutex::new(Vec::with_capacity(batch_size))),
304                batch_receiver,
305                batch_sender,
306                item_count: Arc::new(AtomicUsize::new(0)),
307            })
308        }
309
310        /// Creates a new batched queue with a bounded channel for backpressure.
311        ///
312        /// Using a bounded channel helps control memory usage by limiting the number
313        /// of batches that can be queued at once. When the channel is full, producers
314        /// will block when attempting to send a full batch.
315        ///
316        /// # Arguments
317        ///
318        /// * `batch_size` - The number of items to collect before forming a batch
319        /// * `max_batches` - The maximum number of batches that can be queued
320        ///
321        /// # Examples
322        ///
323        /// ```
324        /// use batched_queue::BatchedQueue;
325        ///
326        /// // Create a queue with batch size 10 and at most 5 batches in the channel
327        /// let queue = BatchedQueue::<i32>::new_bounded(10, 5).expect("Failed to create queue");
328        /// ```
329        pub fn new_bounded(
330            batch_size: usize,
331            max_batches: usize,
332        ) -> Result<Self, BatchedQueueError> {
333            if batch_size == 0 {
334                return Err(BatchedQueueError::InvalidBatchSize(batch_size));
335            }
336            if max_batches == 0 {
337                return Err(BatchedQueueError::InvalidBatchSize(max_batches));
338            }
339
340            let (batch_sender, batch_receiver) = channel::bounded(max_batches);
341            Ok(Self {
342                batch_size,
343                current_batch: Arc::new(Mutex::new(Vec::with_capacity(batch_size))),
344                batch_receiver,
345                batch_sender,
346                item_count: Arc::new(AtomicUsize::new(0)),
347            })
348        }
349
350        /// Creates a sender for this queue that can be shared across threads.
351        ///
352        /// Multiple senders can be created from a single queue, allowing
353        /// for concurrent producers.
354        ///
355        /// # Returns
356        ///
357        /// A new [`BatchedQueueSender`] linked to this queue
358        ///
359        /// # Examples
360        ///
361        /// ```
362        /// use batched_queue::BatchedQueue;
363        /// use std::thread;
364        ///
365        /// let queue = BatchedQueue::<i32>::new(10).expect("Failed to create queue");
366        ///
367        /// // Create multiple senders for different threads
368        /// let sender1 = queue.create_sender();
369        /// let sender2 = queue.create_sender();
370        ///
371        /// // Use senders in different threads
372        /// let t1 = thread::spawn(move || {
373        ///     for i in 0..10 {
374        ///         sender1.push(i).expect("Failed to push item");
375        ///     }
376        /// });
377        ///
378        /// let t2 = thread::spawn(move || {
379        ///     for i in 10..20 {
380        ///         sender2.push(i).expect("Failed to push item");
381        ///     }
382        /// });
383        ///
384        /// // Wait for producers to finish
385        /// t1.join().unwrap();
386        /// t2.join().unwrap();
387        /// ```
388        pub fn create_sender(&self) -> BatchedQueueSender<T> {
389            BatchedQueueSender {
390                batch_size: self.batch_size,
391                current_batch: self.current_batch.clone(),
392                batch_sender: self.batch_sender.clone(),
393                item_count: self.item_count.clone(),
394            }
395        }
396
397        /// Takes any items left in the current batch and returns them when shutting down.
398        ///
399        /// This method is useful during controlled shutdown to collect any remaining items
400        /// that haven't formed a complete batch.
401        ///
402        /// # Returns
403        ///
404        /// A vector containing any items that were in the current batch
405        ///
406        /// # Examples
407        ///
408        /// ```
409        /// use batched_queue::BatchedQueue;
410        ///
411        /// let queue = BatchedQueue::<i32>::new(10).expect("Failed to create queue");
412        /// let sender = queue.create_sender();
413        ///
414        /// // Add some items, but not enough to form a complete batch
415        /// for i in 0..3 {
416        ///     sender.push(i).expect("Failed to push item");
417        /// }
418        ///
419        /// // Close the queue and get remaining items
420        /// let remaining = queue.close_queue();
421        /// assert_eq!(remaining.len(), 3);
422        /// ```
423        pub fn close_queue(&self) -> Vec<T> {
424            // Take any items left in the current batch
425            let mut batch = self.current_batch.lock();
426            std::mem::take(&mut *batch)
427        }
428    }
429
430    impl<T: Send + 'static> BatchedQueueTrait<T> for BatchedQueue<T> {
431        fn push(&self, item: T) -> Result<(), BatchedQueueError> {
432            let mut batch = self.current_batch.lock();
433            batch.push(item);
434
435            let count = self.item_count.fetch_add(1, Ordering::SeqCst);
436
437            if count % self.batch_size == self.batch_size - 1 {
438                let full_batch =
439                    std::mem::replace(&mut *batch, Vec::with_capacity(self.batch_size));
440
441                self.batch_sender
442                    .send(full_batch)
443                    .map_err(|_| BatchedQueueError::Disconnected)?;
444            }
445
446            Ok(())
447        }
448
449        fn try_next_batch(&self) -> Result<Option<Vec<T>>, BatchedQueueError> {
450            match self.batch_receiver.try_recv() {
451                Ok(batch) => Ok(Some(batch)),
452                Err(channel::TryRecvError::Empty) => Ok(None),
453                Err(channel::TryRecvError::Disconnected) => Err(BatchedQueueError::Disconnected),
454            }
455        }
456
457        fn next_batch(&self) -> Result<Vec<T>, BatchedQueueError> {
458            self.batch_receiver
459                .recv()
460                .map_err(|_| BatchedQueueError::Disconnected)
461        }
462
463        fn next_batch_timeout(
464            &self,
465            timeout: std::time::Duration,
466        ) -> Result<Vec<T>, BatchedQueueError> {
467            match self.batch_receiver.recv_timeout(timeout) {
468                Ok(batch) => Ok(batch),
469                Err(channel::RecvTimeoutError::Timeout) => Err(BatchedQueueError::Timeout(timeout)),
470                Err(channel::RecvTimeoutError::Disconnected) => {
471                    Err(BatchedQueueError::Disconnected)
472                }
473            }
474        }
475
476        fn len(&self) -> usize {
477            self.item_count.load(Ordering::SeqCst)
478        }
479
480        fn capacity(&self) -> usize {
481            self.batch_size
482        }
483
484        fn flush(&self) -> Result<(), BatchedQueueError> {
485            let mut batch = self.current_batch.lock();
486            if !batch.is_empty() {
487                let partial_batch =
488                    std::mem::replace(&mut *batch, Vec::with_capacity(self.batch_size));
489                self.batch_sender
490                    .send(partial_batch)
491                    .map_err(|_| BatchedQueueError::Disconnected)?;
492            }
493            Ok(())
494        }
495
496        fn is_empty(&self) -> bool {
497            self.batch_receiver.is_empty() && self.current_batch.lock().is_empty()
498        }
499    }
500
501    /// A sender handle for adding items to a batched queue.
502    ///
503    /// `BatchedQueueSender` provides methods to add items to a batched queue from multiple
504    /// threads. It handles the details of batch management and automatic flushing of batches
505    /// when they reach the configured size.
506    ///
507    /// # Examples
508    ///
509    /// ```
510    /// use batched_queue::BatchedQueue;
511    /// use std::thread;
512    ///
513    /// let queue = BatchedQueue::<String>::new(5).expect("Failed to create queue");
514    /// let sender = queue.create_sender();
515    ///
516    /// // Share the sender with another thread
517    /// thread::spawn(move || {
518    ///     for i in 0..10 {
519    ///         sender.push(format!("Item {}", i)).expect("Failed to push item");
520    ///     }
521    ///     
522    ///     // Ensure any remaining items are sent
523    ///     sender.flush().expect("Failed to flush");
524    /// });
525    /// ```
526    pub struct BatchedQueueSender<T> {
527        batch_size: usize,
528        current_batch: Arc<Mutex<Vec<T>>>,
529        batch_sender: channel::Sender<Vec<T>>,
530        item_count: Arc<AtomicUsize>,
531    }
532
533    impl<T: Send + 'static> Clone for BatchedQueueSender<T> {
534        fn clone(&self) -> Self {
535            Self {
536                batch_size: self.batch_size,
537                current_batch: self.current_batch.clone(),
538                batch_sender: self.batch_sender.clone(),
539                item_count: self.item_count.clone(),
540            }
541        }
542    }
543
544    impl<T: Send + Clone + 'static> BatchedQueueSender<T> {
545        /// Adds an item to the queue.
546        ///
547        /// If adding this item causes the current batch to reach the configured
548        /// batch size, the batch will be automatically sent for processing.
549        /// This method will block if the channel is bounded and full.
550        ///
551        /// # Arguments
552        ///
553        /// * `item` - The item to add to the queue
554        ///
555        /// # Examples
556        ///
557        /// ```
558        /// use batched_queue::BatchedQueue;
559        ///
560        /// let queue = BatchedQueue::<i32>::new(5).expect("Failed to create queue");
561        /// let sender = queue.create_sender();
562        ///
563        /// for i in 0..10 {
564        ///     sender.push(i).expect("Failed to push item");
565        /// }
566        /// ```
567        pub fn push(&self, item: T) -> Result<(), BatchedQueueError> {
568            let should_send_batch;
569            let mut full_batch = None;
570
571            {
572                let mut batch = self.current_batch.lock();
573                batch.push(item);
574
575                let count = self.item_count.fetch_add(1, Ordering::SeqCst);
576                should_send_batch = count % self.batch_size == self.batch_size - 1;
577
578                if should_send_batch {
579                    // Take the batch but minimize time in the critical section
580                    full_batch = Some(std::mem::replace(
581                        &mut *batch,
582                        Vec::with_capacity(self.batch_size),
583                    ));
584                }
585            }
586
587            if let Some(batch) = full_batch {
588                self.batch_sender
589                    .send(batch)
590                    .map_err(|_| BatchedQueueError::Disconnected)?;
591            }
592
593            Ok(())
594        }
595
596        /// Attempts to add an item to the queue without blocking.
597        ///
598        /// This method is similar to `push`, but it will not block if the channel
599        /// is bounded and full. Instead, if a full batch cannot be sent because
600        /// the channel is full, the batch is kept in the current batch and will
601        /// be sent on a future push or flush operation.
602        ///
603        /// # Arguments
604        ///
605        /// * `item` - The item to add to the queue
606        ///
607        /// # Examples
608        ///
609        /// ```
610        /// use batched_queue::BatchedQueue;
611        ///
612        /// // Create a queue with limited capacity
613        /// let queue = BatchedQueue::<i32>::new_bounded(5, 1).expect("Failed to create queue");
614        /// let sender = queue.create_sender();
615        ///
616        /// for i in 0..20 {
617        ///     sender.try_push(i);
618        /// }
619        /// ```
620        pub fn try_push(&self, item: T) -> Result<(), BatchedQueueError> {
621            let should_send_batch;
622            let mut full_batch = None;
623
624            {
625                let mut batch = self.current_batch.lock();
626                batch.push(item);
627
628                let count = self.item_count.fetch_add(1, Ordering::SeqCst);
629                should_send_batch = count % self.batch_size == self.batch_size - 1;
630
631                if should_send_batch {
632                    // Take the batch but minimize time in the critical section
633                    full_batch = Some(std::mem::replace(
634                        &mut *batch,
635                        Vec::with_capacity(self.batch_size),
636                    ));
637                }
638            }
639
640            if let Some(batch_to_send) = full_batch {
641                match self.batch_sender.try_send(batch_to_send.clone()) {
642                    Ok(_) => {}
643                    Err(channel::TrySendError::Full(_)) => {
644                        // If channel is full, we need to restore the batch
645                        {
646                            let mut batch = self.current_batch.lock();
647                            // This assumes the batch is empty, which it should be after the replace above
648                            *batch = batch_to_send;
649                        }
650                        // We didn't actually send a batch, so decrement the count
651                        self.item_count.fetch_sub(1, Ordering::SeqCst);
652                        return Err(BatchedQueueError::ChannelFull);
653                    }
654                    Err(channel::TrySendError::Disconnected(_)) => {
655                        return Err(BatchedQueueError::Disconnected);
656                    }
657                }
658            }
659
660            Ok(())
661        }
662
663        /// Flushes any pending items into a batch, even if the batch is not full.
664        ///
665        /// This method will block if the channel is bounded and full.
666        ///
667        /// # Error
668        ///
669        /// Returns `BatchedQueueError::Disconnected` if the receiving end has been dropped,
670        ///
671        /// # Examples
672        ///
673        /// ```
674        /// use batched_queue::BatchedQueue;
675        ///
676        /// let queue = BatchedQueue::<i32>::new(10).expect("Failed to create queue");
677        /// let sender = queue.create_sender();
678        ///
679        /// // Add some items, but not enough to form a complete batch
680        /// for i in 0..3 {
681        ///     sender.push(i).expect("Failed to push item");
682        /// }
683        ///
684        /// // Flush to ensure items are sent for processing
685        /// sender.flush().expect("Failed to flush");
686        /// ```
687        pub fn flush(&self) -> Result<(), BatchedQueueError> {
688            let mut batch = self.current_batch.lock();
689            if !batch.is_empty() {
690                let partial_batch =
691                    std::mem::replace(&mut *batch, Vec::with_capacity(self.batch_size));
692                self.batch_sender
693                    .send(partial_batch)
694                    .map_err(|_| BatchedQueueError::Disconnected)?;
695            }
696            Ok(())
697        }
698
699        /// Attempts to flush any pending items without blocking.
700        ///
701        /// # Errors
702        ///
703        /// Returns `BatchedQueueError::Disconnected` if the receiving end has been dropped,
704        /// or `BatchedQueueError::ChannelFull` if the channel is full.
705        ///
706        /// # Examples
707        ///
708        /// ```
709        /// use batched_queue::BatchedQueue;
710        ///
711        /// // Create a queue with limited capacity
712        /// let queue = BatchedQueue::<i32>::new_bounded(5, 1).expect("Failed to create queue");
713        /// let sender = queue.create_sender();
714        ///
715        /// for i in 0..3 {
716        ///     sender.push(i).expect("Failed to push item");
717        /// }
718        ///
719        /// // Try to flush without blocking
720        /// if !sender.try_flush().is_ok() {
721        ///     println!("Channel is full, will try again later");
722        /// }
723        /// ```
724        pub fn try_flush(&self) -> Result<(), BatchedQueueError> {
725            let mut batch = self.current_batch.lock();
726            if !batch.is_empty() {
727                let partial_batch =
728                    std::mem::replace(&mut *batch, Vec::with_capacity(self.batch_size));
729                match self.batch_sender.try_send(partial_batch) {
730                    Ok(_) => Ok(()),
731                    Err(channel::TrySendError::Full(_)) => Err(BatchedQueueError::ChannelFull),
732                    Err(channel::TrySendError::Disconnected(_)) => {
733                        Err(BatchedQueueError::Disconnected)
734                    }
735                }
736            } else {
737                Ok(())
738            }
739        }
740    }
741
742    // For testing thread-safe behavior
743    #[cfg(test)]
744    mod tests {
745        use super::*;
746        use std::thread;
747        use std::time::Duration;
748
749        #[test]
750        fn multithreaded() {
751            let queue = BatchedQueue::<i32>::new(10).expect("Failed to create queue");
752            let sender1 = queue.create_sender();
753            let sender2 = queue.create_sender();
754
755            // Thread 1: Push numbers 0-49
756            let t1 = thread::spawn(move || {
757                for i in 0..50 {
758                    sender1.push(i).expect("Failed to push item");
759                    thread::sleep(Duration::from_millis(1));
760                }
761                sender1.flush().expect("Failed to flush");
762            });
763
764            // Thread 2: Push numbers 100-149
765            let t2 = thread::spawn(move || {
766                for i in 100..150 {
767                    sender2.push(i).expect("Failed to push item");
768                    thread::sleep(Duration::from_millis(1));
769                }
770                sender2.flush().expect("Failed to flush");
771            });
772
773            // Consumer thread: Collect all batches
774            let t3 = thread::spawn(move || {
775                let mut all_items = Vec::new();
776
777                // Collect for a reasonable amount of time
778                for _ in 0..15 {
779                    if let Some(batch) = queue.try_next_batch().expect("Failed to get batch") {
780                        all_items.extend(batch);
781                    }
782                    thread::sleep(Duration::from_millis(10));
783                }
784
785                // Make sure we got all remaining batches
786                while let Some(batch) = queue.try_next_batch().expect("Failed to get batch") {
787                    all_items.extend(batch);
788                }
789
790                all_items
791            });
792
793            // Wait for producer threads to finish
794            t1.join().unwrap();
795            t2.join().unwrap();
796
797            // Get results from consumer
798            let result = t3.join().unwrap();
799
800            // Verify we have all 100 items
801            assert_eq!(result.len(), 100);
802
803            // Check that we have all numbers
804            let mut result_sorted = result.clone();
805            result_sorted.sort();
806
807            // Verify we got all numbers 0-49 and 100-149
808            for i in 0..50 {
809                assert!(result_sorted.contains(&i));
810                assert!(result_sorted.contains(&(i + 100)));
811            }
812        }
813
814        #[test]
815        fn timeout() {
816            let queue = BatchedQueue::<i32>::new(5).expect("Failed to create queue");
817            let sender = queue.create_sender();
818
819            // Add 3 items (not enough to trigger a batch)
820            for i in 1..4 {
821                sender.push(i).unwrap();
822            }
823
824            // Try to get a batch with a short timeout - should time out
825            let result = queue.next_batch_timeout(Duration::from_millis(10));
826            assert!(result.is_err());
827
828            // Now flush the incomplete batch
829            sender.flush().unwrap();
830
831            // Should get the batch now
832            let batch = queue.next_batch_timeout(Duration::from_millis(10)).unwrap();
833            assert_eq!(batch, vec![1, 2, 3]);
834        }
835
836        #[test]
837        fn bounded_channel() {
838            // Create a bounded queue with batch size 5 and max 2 batches in the channel
839            let queue = BatchedQueue::new_bounded(5, 2).expect("Failed to create queue");
840            let sender = queue.create_sender();
841
842            // Producer thread
843            let handle = thread::spawn(move || {
844                let mut successful_pushes = 0;
845                // Try to push 20 items
846                for item_idx in 0..20 {
847                    // Use push which will block if the channel is full
848                    sender.push(item_idx).expect("Failed to push item");
849                    successful_pushes += 1;
850
851                    // Add a small delay
852                    if item_idx % 5 == 4 {
853                        // Every 5th item, wait a bit longer
854                        thread::sleep(Duration::from_millis(5));
855                    }
856                }
857                sender.flush().expect("Failed to flush");
858                successful_pushes
859            });
860
861            // Consumer thread - retrieve batches to prevent deadlock
862            let mut received_batches = 0;
863            let mut all_items = Vec::new();
864
865            // Receive batches while the producer is running
866            while received_batches < 4 {
867                // Expect 4 full batches of 5 items each
868                if let Some(batch) = queue.try_next_batch().expect("Failed to get batch") {
869                    received_batches += 1;
870                    all_items.extend(batch);
871                }
872                thread::sleep(Duration::from_millis(5));
873            }
874
875            // Wait for producer to finish
876            let successful_pushes = handle.join().unwrap();
877
878            // Receive any remaining batches
879            while let Some(batch) = queue.try_next_batch().expect("Failed to get batch") {
880                all_items.extend(batch);
881            }
882
883            // Should have all 20 items
884            assert_eq!(all_items.len(), 20);
885            assert_eq!(successful_pushes, 20);
886
887            // Verify we have all numbers 0-19
888            let mut sorted_items = all_items.clone();
889            sorted_items.sort();
890            for i in 0..20 {
891                assert!(sorted_items.contains(&i));
892            }
893        }
894
895        #[test]
896        fn backpressure() {
897            // Create a bounded queue with backpressure
898            let queue = BatchedQueue::new_bounded(5, 1).expect("Failed to create queue"); // Only 1 batch in the channel
899            let sender = queue.create_sender();
900
901            // Fill the first batch and send it
902            for i in 0..5 {
903                sender.push(i).expect("Failed to push item");
904            }
905            // Now the batch is automatically sent because it's full
906
907            // Create a partial second batch
908            for i in 5..8 {
909                sender.push(i).expect("Failed to push item");
910            }
911
912            // At this point, we have one full batch in the channel and a partial batch in current_batch
913
914            // Get the first batch to make room in the channel
915            let batch = queue.next_batch().expect("Failed to get batch");
916            assert_eq!(batch, vec![0, 1, 2, 3, 4]);
917
918            // Now flush the partial batch - this should succeed
919            assert!(sender.try_flush().is_ok());
920
921            // And we should get the second batch
922            let batch = queue
923                .next_batch_timeout(Duration::from_millis(50))
924                .expect("Failed to get batch");
925            assert_eq!(batch, vec![5, 6, 7]);
926        }
927
928        #[test]
929        fn error_handling() {
930            // Test invalid batch size
931            let invalid_queue = BatchedQueue::<i32>::new(0);
932            assert!(matches!(
933                invalid_queue,
934                Err(BatchedQueueError::InvalidBatchSize(0))
935            ));
936
937            // Create a queue with very limited capacity - only 1 batch in the channel
938            let limited_queue = BatchedQueue::new_bounded(5, 1).expect("Failed to create queue");
939            let limited_sender = limited_queue.create_sender();
940
941            // Fill the channel with one complete batch
942            for i in 0..5 {
943                limited_sender
944                    .push(i)
945                    .expect("Should succeed for first batch");
946            }
947
948            // At this point, we have one full batch in the channel
949            // Now, add items to start building a second batch
950            for i in 5..9 {
951                limited_sender
952                    .push(i)
953                    .expect("Should succeed as we're building a partial batch");
954            }
955
956            // Try to complete the second batch, which should fail with ChannelFull
957            // because when it completes, it immediately tries to send it
958            let result = limited_sender.try_push(9);
959            assert!(matches!(result, Err(BatchedQueueError::ChannelFull)));
960
961            // Now let's test the timeout
962            // First ensure there's nothing ready in the queue by consuming the batch
963            limited_queue
964                .next_batch()
965                .expect("Should get the first batch");
966
967            // Now we should have nothing in the channel and only a partial batch
968            // Try to get a batch with a very short timeout
969            let result = limited_queue.next_batch_timeout(Duration::from_millis(1));
970            assert!(matches!(result, Err(BatchedQueueError::Timeout(_))));
971        }
972
973        #[cfg(test)]
974        mod stress_tests {
975            use super::*;
976            use std::collections::HashSet;
977            use std::sync::atomic::{AtomicUsize, Ordering};
978            use std::sync::{Arc, Barrier};
979            use std::thread;
980            use std::time::{Duration, Instant};
981
982            #[test]
983            fn batched_queue() {
984                // Configuration parameters
985                const BATCH_SIZE: usize = 100;
986                const CHANNEL_CAPACITY: usize = 10;
987                const PRODUCER_COUNT: usize = 64;
988                const ITEMS_PER_PRODUCER: usize = 10_000;
989                const CONSUMER_COUNT: usize = 4;
990
991                // Wrap the queue in an Arc so it can be safely shared between threads
992                let queue = Arc::new(
993                    BatchedQueue::new_bounded(BATCH_SIZE, CHANNEL_CAPACITY)
994                        .expect("Failed to create queue"),
995                );
996
997                // Setup synchronization primitives
998                let start_barrier = Arc::new(Barrier::new(PRODUCER_COUNT + CONSUMER_COUNT + 1));
999                let total_expected_items = PRODUCER_COUNT * ITEMS_PER_PRODUCER;
1000                let processed_items = Arc::new(AtomicUsize::new(0));
1001
1002                // Tracking data structures for verification
1003                let all_produced_items = Arc::new(parking_lot::Mutex::new(HashSet::new()));
1004                let all_consumed_items = Arc::new(parking_lot::Mutex::new(HashSet::new()));
1005
1006                // Track performance metrics
1007                let producer_times = Arc::new(parking_lot::Mutex::new(Vec::new()));
1008                let consumer_times = Arc::new(parking_lot::Mutex::new(Vec::new()));
1009
1010                // Create and launch producer threads
1011                let producer_handles: Vec<_> = (0..PRODUCER_COUNT)
1012                    .map(|producer_id| {
1013                        let queue_sender = queue.create_sender();
1014                        let start = start_barrier.clone();
1015                        let produced = all_produced_items.clone();
1016                        let producer_timing = producer_times.clone();
1017
1018                        thread::spawn(move || {
1019                            // Wait for all threads to be ready
1020                            start.wait();
1021                            let start_time = Instant::now();
1022
1023                            // Producer offset ensures each producer generates unique values
1024                            let offset = producer_id * ITEMS_PER_PRODUCER;
1025
1026                            // Track items we produced in this thread
1027                            let mut local_produced = HashSet::new();
1028
1029                            for i in 0..ITEMS_PER_PRODUCER {
1030                                let item = offset + i;
1031                                queue_sender.push(item).expect("Failed to push item");
1032                                local_produced.insert(item);
1033
1034                                // Occasionally sleep to create more contention patterns
1035                                if i % 1000 == 0 {
1036                                    thread::sleep(Duration::from_micros(10));
1037                                }
1038                            }
1039
1040                            // Ensure final batch is sent
1041                            queue_sender.flush().expect("Failed to flush");
1042
1043                            // Record items this producer generated
1044                            let mut global_produced = produced.lock();
1045                            for item in local_produced {
1046                                global_produced.insert(item);
1047                            }
1048
1049                            let elapsed = start_time.elapsed();
1050                            producer_timing.lock().push(elapsed);
1051
1052                            println!("Producer {}: Finished in {:?}", producer_id, elapsed);
1053                        })
1054                    })
1055                    .collect();
1056
1057                // Create and launch consumer threads
1058                let consumer_handles: Vec<_> = (0..CONSUMER_COUNT)
1059                    .map(|consumer_id| {
1060                        let queue = queue.clone(); // Clone the Arc, not the queue itself
1061                        let start = start_barrier.clone();
1062                        let processed = processed_items.clone();
1063                        let consumed = all_consumed_items.clone();
1064                        let consumer_timing = consumer_times.clone();
1065
1066                        thread::spawn(move || {
1067                            // Wait for all threads to be ready
1068                            start.wait();
1069                            let start_time = Instant::now();
1070
1071                            // Track items consumed by this thread
1072                            let mut local_consumed = HashSet::new();
1073                            let mut batches_processed = 0;
1074
1075                            loop {
1076                                // Try to get a batch with timeout
1077                                if let Ok(batch) =
1078                                    queue.next_batch_timeout(Duration::from_millis(100))
1079                                {
1080                                    batches_processed += 1;
1081                                    let batch_size = batch.len();
1082
1083                                    // Process each item in the batch
1084                                    for item in batch {
1085                                        local_consumed.insert(item);
1086                                    }
1087
1088                                    // Update total processed count
1089                                    let current = processed.fetch_add(batch_size, Ordering::SeqCst);
1090
1091                                    // Check if we've processed all expected items
1092                                    if current + batch_size >= total_expected_items {
1093                                        break;
1094                                    }
1095                                } else if processed.load(Ordering::SeqCst) >= total_expected_items {
1096                                    // No more batches and we've processed all expected items
1097                                    break;
1098                                }
1099
1100                                // Occasionally check if we're done to avoid waiting for full timeout
1101                                if processed.load(Ordering::SeqCst) >= total_expected_items {
1102                                    break;
1103                                }
1104                            }
1105
1106                            // Record the items this consumer processed
1107                            let mut global_consumed = consumed.lock();
1108                            for item in local_consumed {
1109                                global_consumed.insert(item);
1110                            }
1111
1112                            let elapsed = start_time.elapsed();
1113                            consumer_timing.lock().push(elapsed);
1114
1115                            println!(
1116                                "Consumer {}: Processed {} batches in {:?}",
1117                                consumer_id, batches_processed, elapsed
1118                            );
1119                        })
1120                    })
1121                    .collect();
1122
1123                // Start the test
1124                println!(
1125                    "Starting stress test with {} producers and {} consumers",
1126                    PRODUCER_COUNT, CONSUMER_COUNT
1127                );
1128                println!(
1129                    "Each producer will generate {} items with batch size {}",
1130                    ITEMS_PER_PRODUCER, BATCH_SIZE
1131                );
1132
1133                let overall_start = Instant::now();
1134                start_barrier.wait();
1135
1136                // Wait for all producers to finish
1137                for handle in producer_handles {
1138                    handle.join().unwrap();
1139                }
1140
1141                println!("All producers finished");
1142
1143                // Wait for all consumers to finish
1144                for handle in consumer_handles {
1145                    handle.join().unwrap();
1146                }
1147
1148                let overall_elapsed = overall_start.elapsed();
1149                println!("All consumers finished");
1150                println!("Overall test time: {:?}", overall_elapsed);
1151
1152                // Verify results
1153                let produced = all_produced_items.lock();
1154                let consumed = all_consumed_items.lock();
1155
1156                println!("Items produced: {}", produced.len());
1157                println!("Items consumed: {}", consumed.len());
1158
1159                // Verify all produced items were consumed
1160                assert_eq!(
1161                    produced.len(),
1162                    total_expected_items,
1163                    "Number of produced items doesn't match expected"
1164                );
1165                assert_eq!(
1166                    consumed.len(),
1167                    total_expected_items,
1168                    "Number of consumed items doesn't match expected"
1169                );
1170
1171                for item in produced.iter() {
1172                    assert!(
1173                        consumed.contains(item),
1174                        "Item {} was produced but not consumed",
1175                        item
1176                    );
1177                }
1178
1179                // Calculate performance metrics
1180                let producer_times = producer_times.lock();
1181                let consumer_times = consumer_times.lock();
1182
1183                let avg_producer_time = producer_times.iter().map(|d| d.as_millis()).sum::<u128>()
1184                    / producer_times.len() as u128;
1185
1186                let avg_consumer_time = consumer_times.iter().map(|d| d.as_millis()).sum::<u128>()
1187                    / consumer_times.len() as u128;
1188
1189                let throughput =
1190                    total_expected_items as f64 / (overall_elapsed.as_millis() as f64 / 1000.0);
1191
1192                println!("Average producer time: {}ms", avg_producer_time);
1193                println!("Average consumer time: {}ms", avg_consumer_time);
1194                println!("Throughput: {:.2} items/second", throughput);
1195            }
1196        }
1197    }
1198}