batch_aint_one/
error.rs

1//! Errors.
2
3use std::fmt::Display;
4
5use thiserror::Error;
6use tokio::sync::{mpsc::error::SendError, oneshot::error::RecvError};
7
8/// An error that occurred while trying to batch.
9#[derive(Error, Debug, Clone)]
10#[non_exhaustive]
11pub enum BatchError<E: Display> {
12    /// Something went wrong while submitting an input for processing.
13    ///
14    /// Unrecoverable.
15    #[error("Unable to send item to the worker for batching: channel closed")]
16    Tx,
17
18    /// Something went wrong while waiting for the output of a batch.
19    ///
20    /// Unrecoverable.
21    #[error("Error while waiting for batch results: channel closed. {0}")]
22    Rx(RecvError),
23
24    /// The current batch is full so the item was rejected.
25    ///
26    /// Recoverable.
27    #[error("Batch item rejected: {0}")]
28    Rejected(RejectionReason),
29
30    /// Something went wrong while processing a batch.
31    #[error("The entire batch failed: {0}")]
32    BatchFailed(E),
33
34    /// Something went wrong while acquiring resources for processing.
35    #[error("Resource acquisition failed: {0}")]
36    ResourceAcquisitionFailed(E),
37
38    /// The batch was cancelled before completion.
39    #[error("The batch was cancelled")]
40    Cancelled,
41
42    /// The batch processing (or resource acquisition) panicked.
43    #[error("The batch processing panicked")]
44    Panic,
45}
46
47/// Reason for rejecting a batch item.
48#[derive(Debug, Clone, Copy)]
49#[non_exhaustive]
50pub enum RejectionReason {
51    /// The batch queue is full.
52    BatchQueueFull(ConcurrencyStatus),
53}
54
55/// Status of concurrency when rejecting a batch item.
56#[derive(Debug, Clone, Copy)]
57#[non_exhaustive]
58pub enum ConcurrencyStatus {
59    /// There is available concurrency to process another batch.
60    ///
61    /// It might be being used because batches are waiting to be processed.
62    Available,
63    /// The maximum concurrency for this key has been reached.
64    MaxedOut,
65}
66
67impl Display for RejectionReason {
68    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69        f.write_str(match self {
70            RejectionReason::BatchQueueFull(concurrency) => match concurrency {
71                ConcurrencyStatus::Available => "the batch queue is full",
72                ConcurrencyStatus::MaxedOut => {
73                    "the batch queue is full and maximum concurrency reached"
74                }
75            },
76        })
77    }
78}
79
80/// Result type for batch operations.
81pub type BatchResult<T, E> = std::result::Result<T, BatchError<E>>;
82
83impl<E: Display> From<RecvError> for BatchError<E> {
84    fn from(rx_err: RecvError) -> Self {
85        BatchError::Rx(rx_err)
86    }
87}
88
89impl<T, E: Display> From<SendError<T>> for BatchError<E> {
90    fn from(_tx_err: SendError<T>) -> Self {
91        BatchError::Tx
92    }
93}
94
95impl<E> BatchError<E>
96where
97    E: Display,
98{
99    /// Get the inner error for general batch failures, otherwise self.
100    pub fn inner(self) -> BatchResult<E, E> {
101        match self {
102            BatchError::BatchFailed(source) => Ok(source),
103            _ => Err(self),
104        }
105    }
106}