1use std::fmt::Display;
4
5use thiserror::Error;
6use tokio::sync::{mpsc::error::SendError, oneshot::error::RecvError};
7
8#[derive(Error, Debug, Clone)]
10#[non_exhaustive]
11pub enum BatchError<E: Display> {
12 #[error("Unable to send item to the worker for batching: channel closed")]
16 Tx,
17
18 #[error("Error while waiting for batch results: channel closed. {0}")]
22 Rx(#[from] RecvError),
23
24 #[error("Batch item rejected: {0}")]
28 Rejected(RejectionReason),
29
30 #[error("The entire batch failed")]
32 BatchFailed(#[source] E),
33
34 #[error("The processor violated its invariants")]
36 ProcessorInvariantViolation(#[source] ProcessorInvariantViolation),
37
38 #[error("Resource acquisition failed")]
40 ResourceAcquisitionFailed(#[source] E),
41
42 #[error("The batch was cancelled")]
44 Cancelled,
45
46 #[error("The batch processing panicked")]
48 Panic,
49}
50
51#[derive(Error, Debug, Clone)]
53#[non_exhaustive]
54pub enum ProcessorInvariantViolation {
55 #[error(
57 "The processor returned the wrong number of outputs: expected {expected}, got {actual}"
58 )]
59 WrongNumberOfOutputs {
60 expected: usize,
62 actual: usize,
64 },
65}
66
67#[derive(Debug, Clone, Copy)]
69#[non_exhaustive]
70pub enum RejectionReason {
71 BatchQueueFull(ConcurrencyStatus),
73}
74
75#[derive(Debug, Clone, Copy)]
77#[non_exhaustive]
78pub enum ConcurrencyStatus {
79 Available,
83 MaxedOut,
85}
86
87impl Display for RejectionReason {
88 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89 f.write_str(match self {
90 RejectionReason::BatchQueueFull(concurrency) => match concurrency {
91 ConcurrencyStatus::Available => "the batch queue is full",
92 ConcurrencyStatus::MaxedOut => {
93 "the batch queue is full and maximum concurrency reached"
94 }
95 },
96 })
97 }
98}
99
100pub type BatchResult<T, E> = std::result::Result<T, BatchError<E>>;
102
103impl<T, E: Display> From<SendError<T>> for BatchError<E> {
104 fn from(_tx_err: SendError<T>) -> Self {
105 BatchError::Tx
106 }
107}
108
109impl<E> BatchError<E>
110where
111 E: Display,
112{
113 pub fn inner(self) -> BatchResult<E, E> {
115 match self {
116 BatchError::BatchFailed(source) => Ok(source),
117 BatchError::ResourceAcquisitionFailed(source) => Ok(source),
118 _ => Err(self),
119 }
120 }
121}