Skip to main content

libdd_data_pipeline/trace_exporter/
error.rs

1// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::telemetry::error::TelemetryError;
5use crate::trace_exporter::msgpack_decoder::decode::error::DecodeError;
6use http::StatusCode;
7use libdd_common::http_common;
8use rmp_serde::encode::Error as EncodeError;
9use std::error::Error;
10use std::fmt::{Debug, Display};
11
12/// Represents different kinds of errors that can occur when interacting with the agent.
13#[derive(Debug, PartialEq)]
14pub enum AgentErrorKind {
15    /// Indicates that the agent returned an empty response.
16    EmptyResponse,
17}
18
19impl Display for AgentErrorKind {
20    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
21        match self {
22            AgentErrorKind::EmptyResponse => write!(f, "Agent empty response"),
23        }
24    }
25}
26
27/// Represents different kinds of errors that can occur during the builder process.
28#[derive(Debug, PartialEq)]
29pub enum BuilderErrorKind {
30    /// Represents an error when an invalid URI is provided.
31    /// The associated `String` contains underlying error message.
32    InvalidUri(String),
33    /// Indicates that the telemetry configuration is invalid.
34    InvalidTelemetryConfig(String),
35    /// Indicates any incompatible configuration
36    InvalidConfiguration(String),
37}
38
39impl Display for BuilderErrorKind {
40    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41        match self {
42            BuilderErrorKind::InvalidUri(msg) => write!(f, "Invalid URI: {msg}"),
43            BuilderErrorKind::InvalidTelemetryConfig(msg) => {
44                write!(f, "Invalid telemetry configuration: {msg}")
45            }
46            BuilderErrorKind::InvalidConfiguration(msg) => {
47                write!(f, "Invalid configuration: {msg}")
48            }
49        }
50    }
51}
52
53/// Represents different kinds of internal errors.
54#[derive(Debug, PartialEq)]
55pub enum InternalErrorKind {
56    /// Indicates that some background workers are in an invalid state. The associated `String`
57    /// contains the error message.
58    InvalidWorkerState(String),
59}
60
61impl Display for InternalErrorKind {
62    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63        match self {
64            InternalErrorKind::InvalidWorkerState(msg) => {
65                write!(f, "Invalid worker state: {msg}")
66            }
67        }
68    }
69}
70
71/// Represents different kinds of network errors.
72#[derive(Copy, Clone, Debug)]
73pub enum NetworkErrorKind {
74    /// Indicates an error with the body of the request/response.
75    Body,
76    /// Indicates that the request was canceled.
77    Canceled,
78    /// Indicates that the connection was closed.
79    ConnectionClosed,
80    /// Indicates that the message is too large.
81    MessageTooLarge,
82    /// Indicates a parsing error.
83    Parse,
84    /// Indicates that the request timed out.
85    TimedOut,
86    /// Indicates an unknown error.
87    Unknown,
88    /// Indicates that the status code is incorrect.
89    WrongStatus,
90}
91
92/// Represents a network error, containing the kind of error and the source error.
93#[derive(Debug)]
94pub struct NetworkError {
95    kind: NetworkErrorKind,
96    source: anyhow::Error,
97}
98
99impl Error for NetworkError {
100    fn source(&self) -> Option<&(dyn Error + 'static)> {
101        self.source.chain().next()
102    }
103}
104
105impl NetworkError {
106    fn new<E: Into<anyhow::Error>>(kind: NetworkErrorKind, source: E) -> Self {
107        Self {
108            kind,
109            source: source.into(),
110        }
111    }
112
113    pub fn kind(&self) -> NetworkErrorKind {
114        self.kind
115    }
116}
117
118impl Display for NetworkError {
119    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
120        #[allow(clippy::unwrap_used)]
121        std::fmt::Display::fmt(self.source().unwrap(), f)
122    }
123}
124
125#[derive(Debug, PartialEq)]
126pub struct RequestError {
127    code: StatusCode,
128    msg: String,
129}
130
131impl Display for RequestError {
132    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133        write!(
134            f,
135            "{}",
136            format_args!("Error code: {}, Response: {}", self.code, self.msg)
137        )
138    }
139}
140
141impl RequestError {
142    pub fn new(code: StatusCode, msg: &str) -> Self {
143        Self {
144            code,
145            msg: msg.to_owned(),
146        }
147    }
148
149    pub fn status(&self) -> StatusCode {
150        self.code
151    }
152
153    pub fn msg(&self) -> &str {
154        &self.msg
155    }
156}
157
158#[derive(Debug)]
159pub enum ShutdownError {
160    TimedOut(std::time::Duration),
161}
162
163impl Display for ShutdownError {
164    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
165        match self {
166            ShutdownError::TimedOut(dur) => {
167                write!(f, "Shutdown timed out after {}s", dur.as_secs_f32())
168            }
169        }
170    }
171}
172
173/// TraceExporterError holds different types of errors that occur when handling traces.
174#[derive(Debug)]
175pub enum TraceExporterError {
176    /// Error in agent response processing.
177    Agent(AgentErrorKind),
178    /// Invalid builder input.
179    Builder(BuilderErrorKind),
180    /// Error internal to the trace exporter.
181    Internal(InternalErrorKind),
182    /// Error in deserialization of incoming trace payload.
183    Deserialization(DecodeError),
184    /// Generic IO error.
185    Io(std::io::Error),
186    // Shutdown as not succeeded after some time
187    Shutdown(ShutdownError),
188    /// Telemetry related error.
189    Telemetry(String),
190    /// Network related error (i.e. hyper error).
191    Network(NetworkError),
192    /// Agent responded with an error code.
193    Request(RequestError),
194    /// Error in serialization of processed trace payload.
195    Serialization(EncodeError),
196}
197
198impl Display for TraceExporterError {
199    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
200        match self {
201            TraceExporterError::Agent(e) => write!(f, "Agent response processing: {e}"),
202            TraceExporterError::Builder(e) => write!(f, "Invalid builder input: {e}"),
203            TraceExporterError::Internal(e) => write!(f, "Internal: {e}"),
204            TraceExporterError::Deserialization(e) => {
205                write!(f, "Deserialization of incoming payload: {e}")
206            }
207            TraceExporterError::Io(e) => write!(f, "IO: {e}"),
208            TraceExporterError::Shutdown(e) => write!(f, "Shutdown: {e}"),
209            TraceExporterError::Telemetry(e) => write!(f, "Telemetry: {e}"),
210            TraceExporterError::Network(e) => write!(f, "Network: {e}"),
211            TraceExporterError::Request(e) => write!(f, "Agent responded with an error code: {e}"),
212            TraceExporterError::Serialization(e) => {
213                write!(f, "Serialization of trace payload payload: {e}")
214            }
215        }
216    }
217}
218
219impl From<EncodeError> for TraceExporterError {
220    fn from(value: EncodeError) -> Self {
221        TraceExporterError::Serialization(value)
222    }
223}
224
225impl From<http::uri::InvalidUri> for TraceExporterError {
226    fn from(value: http::uri::InvalidUri) -> Self {
227        TraceExporterError::Builder(BuilderErrorKind::InvalidUri(value.to_string()))
228    }
229}
230
231impl From<http_common::Error> for TraceExporterError {
232    fn from(err: http_common::Error) -> Self {
233        match err {
234            http_common::Error::Client(e) => TraceExporterError::from(e),
235            http_common::Error::Other(e) => TraceExporterError::Network(NetworkError {
236                kind: NetworkErrorKind::Unknown,
237                source: e,
238            }),
239            http_common::Error::Infallible(e) => match e {},
240        }
241    }
242}
243
244impl From<http_common::ClientError> for TraceExporterError {
245    fn from(err: http_common::ClientError) -> Self {
246        use http_common::ErrorKind;
247        let network_kind = match err.kind() {
248            ErrorKind::Closed => NetworkErrorKind::ConnectionClosed,
249            ErrorKind::Parse => NetworkErrorKind::Parse,
250            ErrorKind::Canceled => NetworkErrorKind::Canceled,
251            ErrorKind::Incomplete | ErrorKind::WriteAborted => NetworkErrorKind::Body,
252            ErrorKind::ParseStatus => NetworkErrorKind::WrongStatus,
253            ErrorKind::Timeout => NetworkErrorKind::TimedOut,
254            ErrorKind::Other => NetworkErrorKind::Unknown,
255        };
256        TraceExporterError::Network(NetworkError::new(network_kind, err))
257    }
258}
259
260impl From<DecodeError> for TraceExporterError {
261    fn from(err: DecodeError) -> Self {
262        TraceExporterError::Deserialization(err)
263    }
264}
265
266impl From<std::io::Error> for TraceExporterError {
267    fn from(err: std::io::Error) -> Self {
268        TraceExporterError::Io(err)
269    }
270}
271
272impl From<TelemetryError> for TraceExporterError {
273    fn from(value: TelemetryError) -> Self {
274        match value {
275            TelemetryError::Builder(e) => {
276                TraceExporterError::Builder(BuilderErrorKind::InvalidTelemetryConfig(e))
277            }
278            TelemetryError::Send(e) => TraceExporterError::Telemetry(e),
279        }
280    }
281}
282
283impl Error for TraceExporterError {}
284
285#[cfg(test)]
286mod tests {
287    use super::*;
288
289    #[test]
290    fn test_request_error() {
291        let error = RequestError::new(StatusCode::NOT_FOUND, "Not found");
292        assert_eq!(error.status(), StatusCode::NOT_FOUND);
293        assert_eq!(error.msg(), "Not found")
294    }
295}