use std::fmt::Display;
use thiserror::Error;
use tokio::sync::{mpsc::error::SendError, oneshot::error::RecvError};
#[derive(Error, Debug, Clone)]
#[non_exhaustive]
pub enum BatchError<E: Display> {
#[error("Unable to send item to the worker for batching: channel closed")]
Tx,
#[error("Error while waiting for batch results: channel closed. {0}")]
Rx(#[from] RecvError),
#[error("Batch item rejected: {0}")]
Rejected(RejectionReason),
#[error("The entire batch failed")]
BatchFailed(#[source] E),
#[error("The processor violated its invariants")]
ProcessorInvariantViolation(#[source] ProcessorInvariantViolation),
#[error("Resource acquisition failed")]
ResourceAcquisitionFailed(#[source] E),
#[error("The batch was cancelled")]
Cancelled,
#[error("The batch processing panicked")]
Panic,
}
#[derive(Error, Debug, Clone)]
#[non_exhaustive]
pub enum ProcessorInvariantViolation {
#[error("The processor returned the wrong number of outputs: expected {expected}, got {actual}")]
WrongNumberOfOutputs {
expected: usize,
actual: usize,
},
}
#[derive(Debug, Clone, Copy)]
#[non_exhaustive]
pub enum RejectionReason {
BatchQueueFull(ConcurrencyStatus),
}
#[derive(Debug, Clone, Copy)]
#[non_exhaustive]
pub enum ConcurrencyStatus {
Available,
MaxedOut,
}
impl Display for RejectionReason {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(match self {
RejectionReason::BatchQueueFull(concurrency) => match concurrency {
ConcurrencyStatus::Available => "the batch queue is full",
ConcurrencyStatus::MaxedOut => {
"the batch queue is full and maximum concurrency reached"
}
},
})
}
}
pub type BatchResult<T, E> = std::result::Result<T, BatchError<E>>;
impl<T, E: Display> From<SendError<T>> for BatchError<E> {
fn from(_tx_err: SendError<T>) -> Self {
BatchError::Tx
}
}
impl<E> BatchError<E>
where
E: Display,
{
pub fn inner(self) -> BatchResult<E, E> {
match self {
BatchError::BatchFailed(source) => Ok(source),
BatchError::ResourceAcquisitionFailed(source) => Ok(source),
_ => Err(self),
}
}
}