batch_aint_one/
error.rs

1use std::fmt::Display;
2
3use thiserror::Error;
4use tokio::sync::{mpsc::error::SendError, oneshot::error::RecvError};
5
6/// An error that occurred while trying to batch.
7#[derive(Error, Debug, Clone)]
8#[non_exhaustive]
9pub enum BatchError<E: Display> {
10    /// Something went wrong while submitting an input for processing.
11    ///
12    /// Unrecoverable.
13    #[error("Unable to send item to the worker for batching: channel closed")]
14    Tx,
15
16    /// Something went wrong while waiting for the output of a batch.
17    ///
18    /// Unrecoverable.
19    #[error("Error while waiting for batch results: channel closed. {0}")]
20    Rx(RecvError),
21
22    /// The current batch is full so the item was rejected.
23    ///
24    /// Recoverable.
25    #[error("Batch item rejected: {0}")]
26    Rejected(RejectionReason),
27
28    /// Something went wrong while processing a batch.
29    #[error("The entire batch failed: {0}")]
30    BatchFailed(E),
31
32    /// Something went wrong while acquiring resources for processing.
33    #[error("Resource acquisition failed: {0}")]
34    ResourceAcquisitionFailed(E),
35
36    /// The batch was cancelled before completion.
37    #[error("The batch was cancelled")]
38    Cancelled,
39
40    /// The batch processing (or resource acquisition) panicked.
41    #[error("The batch processing panicked")]
42    Panic,
43}
44
45#[derive(Debug, Clone, Copy)]
46#[non_exhaustive]
47pub enum RejectionReason {
48    /// The batch queue is full.
49    BatchQueueFull(ConcurrencyStatus),
50}
51
52#[derive(Debug, Clone, Copy)]
53#[non_exhaustive]
54pub enum ConcurrencyStatus {
55    /// There is available concurrency to process another batch.
56    ///
57    /// It might be being used because batches are waiting to be processed.
58    Available,
59    /// The maximum concurrency for this key has been reached.
60    MaxedOut,
61}
62
63impl Display for RejectionReason {
64    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65        f.write_str(match self {
66            RejectionReason::BatchQueueFull(concurrency) => match concurrency {
67                ConcurrencyStatus::Available => "the batch queue is full",
68                ConcurrencyStatus::MaxedOut => "the batch queue is full and maximum concurrency reached",
69            },
70        })
71    }
72}
73
74pub type BatchResult<T, E> = std::result::Result<T, BatchError<E>>;
75
76impl<E: Display> From<RecvError> for BatchError<E> {
77    fn from(rx_err: RecvError) -> Self {
78        BatchError::Rx(rx_err)
79    }
80}
81
82impl<T, E: Display> From<SendError<T>> for BatchError<E> {
83    fn from(_tx_err: SendError<T>) -> Self {
84        BatchError::Tx
85    }
86}
87
88impl<E> BatchError<E>
89where
90    E: Display,
91{
92    /// Get the inner error for general batch failures, otherwise self.
93    pub fn inner(self) -> BatchResult<E, E> {
94        match self {
95            BatchError::BatchFailed(source) => Ok(source),
96            _ => Err(self),
97        }
98    }
99}