dynamo_runtime/pipeline/
error.rs1use async_nats::error::Error as NatsError;
19
20pub use anyhow::{anyhow, anyhow as error, bail, ensure, Context, Error, Result};
21
22pub trait PipelineErrorExt {
23 fn try_into_pipeline_error(self) -> Result<PipelineError, Error>;
25
26 fn either_pipeline_error(self) -> either::Either<PipelineError, Error>;
29}
30
31impl PipelineErrorExt for Error {
32 fn try_into_pipeline_error(self) -> Result<PipelineError, Error> {
33 self.downcast::<PipelineError>()
34 }
35
36 fn either_pipeline_error(self) -> either::Either<PipelineError, Error> {
37 match self.downcast::<PipelineError>() {
38 Ok(err) => either::Left(err),
39 Err(err) => either::Right(err),
40 }
41 }
42}
43
44#[derive(Debug, thiserror::Error)]
45pub enum PipelineError {
46 #[error("Generic error: {0}")]
48 Generic(String),
49
50 #[error("Link failed: Edge already set")]
52 EdgeAlreadySet,
53
54 #[error("Disconnected source; no edge on which to send data")]
56 NoEdge,
57
58 #[error("SegmentSink is not connected to an EgressPort")]
59 NoNetworkEdge,
60
61 #[error("Unlinked request; initiating request task was dropped or cancelled")]
67 DetachedStreamReceiver,
68
69 #[error("Unlinked response; response task was dropped or cancelled")]
73 DetachedStreamSender,
74
75 #[error("Serialzation Error: {0}")]
76 SerializationError(String),
77
78 #[error("Deserialization Error: {0}")]
79 DeserializationError(String),
80
81 #[error("Failed to issue request to the control plane: {0}")]
82 ControlPlaneRequestError(String),
83
84 #[error("Failed to establish a streaming connection: {0}")]
85 ConnectionFailed(String),
86
87 #[error("Generate Error: {0}")]
88 GenerateError(Error),
89
90 #[error("An endpoint URL must have the format: namespace/component/endpoint")]
91 InvalidEndpointFormat,
92
93 #[error("NATS Request Error: {0}")]
94 NatsRequestError(#[from] NatsError<async_nats::jetstream::context::RequestErrorKind>),
95
96 #[error("NATS Get Stream Error: {0}")]
97 NatsGetStreamError(#[from] NatsError<async_nats::jetstream::context::GetStreamErrorKind>),
98
99 #[error("NATS Create Stream Error: {0}")]
100 NatsCreateStreamError(#[from] NatsError<async_nats::jetstream::context::CreateStreamErrorKind>),
101
102 #[error("NATS Consumer Error: {0}")]
103 NatsConsumerError(#[from] NatsError<async_nats::jetstream::stream::ConsumerErrorKind>),
104
105 #[error("NATS Batch Error: {0}")]
106 NatsBatchError(#[from] NatsError<async_nats::jetstream::consumer::pull::BatchErrorKind>),
107
108 #[error("NATS Publish Error: {0}")]
109 NatsPublishError(#[from] NatsError<async_nats::client::PublishErrorKind>),
110
111 #[error("NATS Connect Error: {0}")]
112 NatsConnectError(#[from] NatsError<async_nats::ConnectErrorKind>),
113
114 #[error("NATS Subscriber Error: {0}")]
115 NatsSubscriberError(#[from] async_nats::SubscribeError),
116
117 #[error("Local IP Address Error: {0}")]
118 LocalIpAddressError(#[from] local_ip_address::Error),
119
120 #[error("Prometheus Error: {0}")]
121 PrometheusError(#[from] prometheus::Error),
122
123 #[error("Other NATS Error: {0}")]
124 NatsError(#[from] Box<dyn std::error::Error + Send + Sync>),
125
126 #[error("Two Part Codec Error: {0}")]
127 TwoPartCodec(#[from] TwoPartCodecError),
128
129 #[error("Serde Json Error: {0}")]
130 SerdeJsonError(#[from] serde_json::Error),
131
132 #[error("NATS KV Err: {0} for bucket '{1}")]
133 KeyValueError(String, String),
134
135 #[error("Service temporarily unavailable: {0}")]
137 ServiceOverloaded(String),
138}
139
140#[derive(Debug, thiserror::Error)]
141pub enum TwoPartCodecError {
142 #[error("I/O error: {0}")]
143 Io(#[from] std::io::Error),
144
145 #[error("Message size {0} exceeds the maximum allowed size of {1} bytes")]
146 MessageTooLarge(usize, usize),
147
148 #[error("Invalid message: {0}")]
149 InvalidMessage(String),
150
151 #[error("Checksum mismatch")]
152 ChecksumMismatch,
153}