dynamo_runtime/pipeline/
error.rs1use async_nats::error::Error as NatsError;
7
8pub use anyhow::{Context, Error, Result, anyhow, anyhow as error, bail, ensure};
9
10pub trait PipelineErrorExt {
11 fn try_into_pipeline_error(self) -> Result<PipelineError, Error>;
13
14 fn either_pipeline_error(self) -> either::Either<PipelineError, Error>;
17}
18
19impl PipelineErrorExt for Error {
20 fn try_into_pipeline_error(self) -> Result<PipelineError, Error> {
21 self.downcast::<PipelineError>()
22 }
23
24 fn either_pipeline_error(self) -> either::Either<PipelineError, Error> {
25 match self.downcast::<PipelineError>() {
26 Ok(err) => either::Left(err),
27 Err(err) => either::Right(err),
28 }
29 }
30}
31
32#[derive(Debug, thiserror::Error)]
33pub enum PipelineError {
34 #[error("Generic error: {0}")]
36 Generic(String),
37
38 #[error("Link failed: Edge already set")]
40 EdgeAlreadySet,
41
42 #[error("Disconnected source; no edge on which to send data")]
44 NoEdge,
45
46 #[error("SegmentSink is not connected to an EgressPort")]
47 NoNetworkEdge,
48
49 #[error("Unlinked request; initiating request task was dropped or cancelled")]
55 DetachedStreamReceiver,
56
57 #[error("Unlinked response; response task was dropped or cancelled")]
61 DetachedStreamSender,
62
63 #[error("Serialzation Error: {0}")]
64 SerializationError(String),
65
66 #[error("Deserialization Error: {0}")]
67 DeserializationError(String),
68
69 #[error("Failed to issue request to the control plane: {0}")]
70 ControlPlaneRequestError(String),
71
72 #[error("Failed to establish a streaming connection: {0}")]
73 ConnectionFailed(String),
74
75 #[error("Generate Error: {0}")]
76 GenerateError(Error),
77
78 #[error("An endpoint URL must have the format: namespace/component/endpoint")]
79 InvalidEndpointFormat,
80
81 #[error("NATS Request Error: {0}")]
82 NatsRequestError(#[from] NatsError<async_nats::jetstream::context::RequestErrorKind>),
83
84 #[error("NATS Get Stream Error: {0}")]
85 NatsGetStreamError(#[from] NatsError<async_nats::jetstream::context::GetStreamErrorKind>),
86
87 #[error("NATS Create Stream Error: {0}")]
88 NatsCreateStreamError(#[from] NatsError<async_nats::jetstream::context::CreateStreamErrorKind>),
89
90 #[error("NATS Consumer Error: {0}")]
91 NatsConsumerError(#[from] NatsError<async_nats::jetstream::stream::ConsumerErrorKind>),
92
93 #[error("NATS Batch Error: {0}")]
94 NatsBatchError(#[from] NatsError<async_nats::jetstream::consumer::pull::BatchErrorKind>),
95
96 #[error("NATS Publish Error: {0}")]
97 NatsPublishError(#[from] NatsError<async_nats::client::PublishErrorKind>),
98
99 #[error("NATS Connect Error: {0}")]
100 NatsConnectError(#[from] NatsError<async_nats::ConnectErrorKind>),
101
102 #[error("NATS Subscriber Error: {0}")]
103 NatsSubscriberError(#[from] async_nats::SubscribeError),
104
105 #[error("Local IP Address Error: {0}")]
106 LocalIpAddressError(#[from] local_ip_address::Error),
107
108 #[error("Prometheus Error: {0}")]
109 PrometheusError(#[from] prometheus::Error),
110
111 #[error("Other NATS Error: {0}")]
112 NatsError(#[from] Box<dyn std::error::Error + Send + Sync>),
113
114 #[error("Two Part Codec Error: {0}")]
115 TwoPartCodec(#[from] TwoPartCodecError),
116
117 #[error("Serde Json Error: {0}")]
118 SerdeJsonError(#[from] serde_json::Error),
119
120 #[error("NATS KV Err: {0} for bucket '{1}")]
121 KeyValueError(String, String),
122
123 #[error("Service temporarily unavailable: {0}")]
125 ServiceOverloaded(String),
126}
127
128#[derive(Debug, thiserror::Error)]
129pub enum TwoPartCodecError {
130 #[error("I/O error: {0}")]
131 Io(#[from] std::io::Error),
132
133 #[error("Message size {0} exceeds the maximum allowed size of {1} bytes")]
134 MessageTooLarge(usize, usize),
135
136 #[error("Invalid message: {0}")]
137 InvalidMessage(String),
138
139 #[error("Checksum mismatch")]
140 ChecksumMismatch,
141}