use async_nats::error::Error as NatsError;
pub use anyhow::{Context, Error, Result, anyhow, anyhow as error, bail, ensure};
pub trait PipelineErrorExt {
fn try_into_pipeline_error(self) -> Result<PipelineError, Error>;
fn either_pipeline_error(self) -> either::Either<PipelineError, Error>;
}
impl PipelineErrorExt for Error {
fn try_into_pipeline_error(self) -> Result<PipelineError, Error> {
self.downcast::<PipelineError>()
}
fn either_pipeline_error(self) -> either::Either<PipelineError, Error> {
match self.downcast::<PipelineError>() {
Ok(err) => either::Left(err),
Err(err) => either::Right(err),
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum PipelineError {
#[error("Generic error: {0}")]
Generic(String),
#[error("Link failed: Edge already set")]
EdgeAlreadySet,
#[error("Disconnected source; no edge on which to send data")]
NoEdge,
#[error("SegmentSink is not connected to an EgressPort")]
NoNetworkEdge,
#[error("Unlinked request; initiating request task was dropped or cancelled")]
DetachedStreamReceiver,
#[error("Unlinked response; response task was dropped or cancelled")]
DetachedStreamSender,
#[error("Serialzation Error: {0}")]
SerializationError(String),
#[error("Deserialization Error: {0}")]
DeserializationError(String),
#[error("Failed to issue request to the control plane: {0}")]
ControlPlaneRequestError(String),
#[error("Failed to establish a streaming connection: {0}")]
ConnectionFailed(String),
#[error("Generate Error: {0}")]
GenerateError(Error),
#[error("An endpoint URL must have the format: namespace/component/endpoint")]
InvalidEndpointFormat,
#[error("NATS Request Error: {0}")]
NatsRequestError(#[from] NatsError<async_nats::jetstream::context::RequestErrorKind>),
#[error("NATS Get Stream Error: {0}")]
NatsGetStreamError(#[from] NatsError<async_nats::jetstream::context::GetStreamErrorKind>),
#[error("NATS Create Stream Error: {0}")]
NatsCreateStreamError(#[from] NatsError<async_nats::jetstream::context::CreateStreamErrorKind>),
#[error("NATS Consumer Error: {0}")]
NatsConsumerError(#[from] NatsError<async_nats::jetstream::stream::ConsumerErrorKind>),
#[error("NATS Batch Error: {0}")]
NatsBatchError(#[from] NatsError<async_nats::jetstream::consumer::pull::BatchErrorKind>),
#[error("NATS Publish Error: {0}")]
NatsPublishError(#[from] NatsError<async_nats::client::PublishErrorKind>),
#[error("NATS Connect Error: {0}")]
NatsConnectError(#[from] NatsError<async_nats::ConnectErrorKind>),
#[error("NATS Subscriber Error: {0}")]
NatsSubscriberError(#[from] async_nats::SubscribeError),
#[error("Local IP Address Error: {0}")]
LocalIpAddressError(#[from] local_ip_address::Error),
#[error("Prometheus Error: {0}")]
PrometheusError(#[from] prometheus::Error),
#[error("Other NATS Error: {0}")]
NatsError(#[from] Box<dyn std::error::Error + Send + Sync>),
#[error("Two Part Codec Error: {0}")]
TwoPartCodec(#[from] TwoPartCodecError),
#[error("Serde Json Error: {0}")]
SerdeJsonError(#[from] serde_json::Error),
#[error("NATS KV Err: {0} for bucket '{1}")]
KeyValueError(String, String),
#[error("Service temporarily unavailable: {0}")]
ServiceOverloaded(String),
}
#[derive(Debug, thiserror::Error)]
pub enum TwoPartCodecError {
#[error("I/O error: {0}")]
Io(#[from] std::io::Error),
#[error("Message size {0} exceeds the maximum allowed size of {1} bytes")]
MessageTooLarge(usize, usize),
#[error("Invalid message: {0}")]
InvalidMessage(String),
#[error("Checksum mismatch")]
ChecksumMismatch,
}