libdd_data_pipeline/trace_exporter/
error.rs1use crate::telemetry::error::TelemetryError;
5use crate::trace_exporter::msgpack_decoder::decode::error::DecodeError;
6use http::StatusCode;
7use libdd_common::http_common;
8use rmp_serde::encode::Error as EncodeError;
9use std::error::Error;
10use std::fmt::{Debug, Display};
11
12#[derive(Debug, PartialEq)]
14pub enum AgentErrorKind {
15 EmptyResponse,
17}
18
19impl Display for AgentErrorKind {
20 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
21 match self {
22 AgentErrorKind::EmptyResponse => write!(f, "Agent empty response"),
23 }
24 }
25}
26
27#[derive(Debug, PartialEq)]
29pub enum BuilderErrorKind {
30 InvalidUri(String),
33 InvalidTelemetryConfig(String),
35 InvalidConfiguration(String),
37}
38
39impl Display for BuilderErrorKind {
40 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41 match self {
42 BuilderErrorKind::InvalidUri(msg) => write!(f, "Invalid URI: {msg}"),
43 BuilderErrorKind::InvalidTelemetryConfig(msg) => {
44 write!(f, "Invalid telemetry configuration: {msg}")
45 }
46 BuilderErrorKind::InvalidConfiguration(msg) => {
47 write!(f, "Invalid configuration: {msg}")
48 }
49 }
50 }
51}
52
53#[derive(Debug, PartialEq)]
55pub enum InternalErrorKind {
56 InvalidWorkerState(String),
59}
60
61impl Display for InternalErrorKind {
62 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63 match self {
64 InternalErrorKind::InvalidWorkerState(msg) => {
65 write!(f, "Invalid worker state: {msg}")
66 }
67 }
68 }
69}
70
71#[derive(Copy, Clone, Debug)]
73pub enum NetworkErrorKind {
74 Body,
76 Canceled,
78 ConnectionClosed,
80 MessageTooLarge,
82 Parse,
84 TimedOut,
86 Unknown,
88 WrongStatus,
90}
91
92#[derive(Debug)]
94pub struct NetworkError {
95 kind: NetworkErrorKind,
96 source: anyhow::Error,
97}
98
99impl Error for NetworkError {
100 fn source(&self) -> Option<&(dyn Error + 'static)> {
101 self.source.chain().next()
102 }
103}
104
105impl NetworkError {
106 fn new<E: Into<anyhow::Error>>(kind: NetworkErrorKind, source: E) -> Self {
107 Self {
108 kind,
109 source: source.into(),
110 }
111 }
112
113 pub fn kind(&self) -> NetworkErrorKind {
114 self.kind
115 }
116}
117
118impl Display for NetworkError {
119 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
120 #[allow(clippy::unwrap_used)]
121 std::fmt::Display::fmt(self.source().unwrap(), f)
122 }
123}
124
125#[derive(Debug, PartialEq)]
126pub struct RequestError {
127 code: StatusCode,
128 msg: String,
129}
130
131impl Display for RequestError {
132 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133 write!(
134 f,
135 "{}",
136 format_args!("Error code: {}, Response: {}", self.code, self.msg)
137 )
138 }
139}
140
141impl RequestError {
142 pub fn new(code: StatusCode, msg: &str) -> Self {
143 Self {
144 code,
145 msg: msg.to_owned(),
146 }
147 }
148
149 pub fn status(&self) -> StatusCode {
150 self.code
151 }
152
153 pub fn msg(&self) -> &str {
154 &self.msg
155 }
156}
157
158#[derive(Debug)]
159pub enum ShutdownError {
160 TimedOut(std::time::Duration),
161}
162
163impl Display for ShutdownError {
164 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
165 match self {
166 ShutdownError::TimedOut(dur) => {
167 write!(f, "Shutdown timed out after {}s", dur.as_secs_f32())
168 }
169 }
170 }
171}
172
173#[derive(Debug)]
175pub enum TraceExporterError {
176 Agent(AgentErrorKind),
178 Builder(BuilderErrorKind),
180 Internal(InternalErrorKind),
182 Deserialization(DecodeError),
184 Io(std::io::Error),
186 Shutdown(ShutdownError),
188 Telemetry(String),
190 Network(NetworkError),
192 Request(RequestError),
194 Serialization(EncodeError),
196}
197
198impl Display for TraceExporterError {
199 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
200 match self {
201 TraceExporterError::Agent(e) => write!(f, "Agent response processing: {e}"),
202 TraceExporterError::Builder(e) => write!(f, "Invalid builder input: {e}"),
203 TraceExporterError::Internal(e) => write!(f, "Internal: {e}"),
204 TraceExporterError::Deserialization(e) => {
205 write!(f, "Deserialization of incoming payload: {e}")
206 }
207 TraceExporterError::Io(e) => write!(f, "IO: {e}"),
208 TraceExporterError::Shutdown(e) => write!(f, "Shutdown: {e}"),
209 TraceExporterError::Telemetry(e) => write!(f, "Telemetry: {e}"),
210 TraceExporterError::Network(e) => write!(f, "Network: {e}"),
211 TraceExporterError::Request(e) => write!(f, "Agent responded with an error code: {e}"),
212 TraceExporterError::Serialization(e) => {
213 write!(f, "Serialization of trace payload payload: {e}")
214 }
215 }
216 }
217}
218
219impl From<EncodeError> for TraceExporterError {
220 fn from(value: EncodeError) -> Self {
221 TraceExporterError::Serialization(value)
222 }
223}
224
225impl From<http::uri::InvalidUri> for TraceExporterError {
226 fn from(value: http::uri::InvalidUri) -> Self {
227 TraceExporterError::Builder(BuilderErrorKind::InvalidUri(value.to_string()))
228 }
229}
230
231impl From<http_common::Error> for TraceExporterError {
232 fn from(err: http_common::Error) -> Self {
233 match err {
234 http_common::Error::Client(e) => TraceExporterError::from(e),
235 http_common::Error::Other(e) => TraceExporterError::Network(NetworkError {
236 kind: NetworkErrorKind::Unknown,
237 source: e,
238 }),
239 http_common::Error::Infallible(e) => match e {},
240 }
241 }
242}
243
244impl From<http_common::ClientError> for TraceExporterError {
245 fn from(err: http_common::ClientError) -> Self {
246 use http_common::ErrorKind;
247 let network_kind = match err.kind() {
248 ErrorKind::Closed => NetworkErrorKind::ConnectionClosed,
249 ErrorKind::Parse => NetworkErrorKind::Parse,
250 ErrorKind::Canceled => NetworkErrorKind::Canceled,
251 ErrorKind::Incomplete | ErrorKind::WriteAborted => NetworkErrorKind::Body,
252 ErrorKind::ParseStatus => NetworkErrorKind::WrongStatus,
253 ErrorKind::Timeout => NetworkErrorKind::TimedOut,
254 ErrorKind::Other => NetworkErrorKind::Unknown,
255 };
256 TraceExporterError::Network(NetworkError::new(network_kind, err))
257 }
258}
259
260impl From<DecodeError> for TraceExporterError {
261 fn from(err: DecodeError) -> Self {
262 TraceExporterError::Deserialization(err)
263 }
264}
265
266impl From<std::io::Error> for TraceExporterError {
267 fn from(err: std::io::Error) -> Self {
268 TraceExporterError::Io(err)
269 }
270}
271
272impl From<TelemetryError> for TraceExporterError {
273 fn from(value: TelemetryError) -> Self {
274 match value {
275 TelemetryError::Builder(e) => {
276 TraceExporterError::Builder(BuilderErrorKind::InvalidTelemetryConfig(e))
277 }
278 TelemetryError::Send(e) => TraceExporterError::Telemetry(e),
279 }
280 }
281}
282
283impl Error for TraceExporterError {}
284
285#[cfg(test)]
286mod tests {
287 use super::*;
288
289 #[test]
290 fn test_request_error() {
291 let error = RequestError::new(StatusCode::NOT_FOUND, "Not found");
292 assert_eq!(error.status(), StatusCode::NOT_FOUND);
293 assert_eq!(error.msg(), "Not found")
294 }
295}