Skip to main content

batch_aint_one/
error.rs

1//! Errors.
2
3use std::fmt::Display;
4
5use thiserror::Error;
6use tokio::sync::{mpsc::error::SendError, oneshot::error::RecvError};
7
8/// An error that occurred while trying to batch.
9#[derive(Error, Debug, Clone)]
10#[non_exhaustive]
11pub enum BatchError<E: Display> {
12    /// Something went wrong while submitting an input for processing.
13    ///
14    /// Unrecoverable.
15    #[error("Unable to send item to the worker for batching: channel closed")]
16    Tx,
17
18    /// Something went wrong while waiting for the output of a batch.
19    ///
20    /// Unrecoverable.
21    #[error("Error while waiting for batch results: channel closed. {0}")]
22    Rx(#[from] RecvError),
23
24    /// The batch queue for this key is full, so the item was rejected.
25    ///
26    /// Recoverable.
27    #[error("Batch item rejected: {0}")]
28    Rejected(RejectionReason),
29
30    /// Something went wrong while processing a batch.
31    #[error("The entire batch failed")]
32    BatchFailed(#[source] E),
33
34    /// The processor violated its invariants.
35    #[error("The processor violated its invariants")]
36    ProcessorInvariantViolation(#[source] ProcessorInvariantViolation),
37
38    /// Something went wrong while acquiring resources for processing.
39    #[error("Resource acquisition failed")]
40    ResourceAcquisitionFailed(#[source] E),
41
42    /// The batch was cancelled before completion.
43    #[error("The batch was cancelled")]
44    Cancelled,
45
46    /// The batch processing (or resource acquisition) panicked.
47    #[error("The batch processing panicked")]
48    Panic,
49}
50
51/// A processor implementation violated its invariants.
52#[derive(Error, Debug, Clone)]
53#[non_exhaustive]
54pub enum ProcessorInvariantViolation {
55    /// The processor returned the wrong number of outputs for the inputs given.
56    #[error(
57        "The processor returned the wrong number of outputs: expected {expected}, got {actual}"
58    )]
59    WrongNumberOfOutputs {
60        /// The number of inputs given.
61        expected: usize,
62        /// The number of outputs returned.
63        actual: usize,
64    },
65}
66
67/// Reason for rejecting a batch item.
68#[derive(Debug, Clone, Copy)]
69#[non_exhaustive]
70pub enum RejectionReason {
71    /// The batch queue is full.
72    BatchQueueFull(ConcurrencyStatus),
73}
74
75/// Status of concurrency when rejecting a batch item.
76#[derive(Debug, Clone, Copy)]
77#[non_exhaustive]
78pub enum ConcurrencyStatus {
79    /// There is available concurrency to process another batch.
80    ///
81    /// It might be being used because batches are waiting to be processed.
82    Available,
83    /// The maximum concurrency for this key has been reached.
84    MaxedOut,
85}
86
87impl Display for RejectionReason {
88    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89        f.write_str(match self {
90            RejectionReason::BatchQueueFull(concurrency) => match concurrency {
91                ConcurrencyStatus::Available => "the batch queue is full",
92                ConcurrencyStatus::MaxedOut => {
93                    "the batch queue is full and maximum concurrency reached"
94                }
95            },
96        })
97    }
98}
99
100/// Result type for batch operations.
101pub type BatchResult<T, E> = std::result::Result<T, BatchError<E>>;
102
103impl<T, E: Display> From<SendError<T>> for BatchError<E> {
104    fn from(_tx_err: SendError<T>) -> Self {
105        BatchError::Tx
106    }
107}
108
109impl<E> BatchError<E>
110where
111    E: Display,
112{
113    /// Get the inner error for general batch failures, otherwise self.
114    pub fn inner(self) -> BatchResult<E, E> {
115        match self {
116            BatchError::BatchFailed(source) => Ok(source),
117            BatchError::ResourceAcquisitionFailed(source) => Ok(source),
118            _ => Err(self),
119        }
120    }
121}