1use futures::io;
2use pyo3::PyErr;
3use thiserror::Error;
4
5#[cfg(any(feature = "kafka", feature = "kafka-vendored"))]
6use rdkafka::error::KafkaError;
7
8use crate::queue::bus::Event;
9
10#[derive(Error, Debug)]
11pub enum FeatureQueueError {
12 #[error("{0}")]
13 InvalidFormatError(String),
14
15 #[error("Failed to create drift record: {0}")]
16 DriftRecordError(String),
17
18 #[error("Failed to create alert record: {0}")]
19 AlertRecordError(String),
20
21 #[error("Failed to get feature")]
22 GetFeatureError,
23
24 #[error("Missing feature map")]
25 MissingFeatureMapError,
26
27 #[error("invalid data type detected for feature: {0}")]
28 InvalidFeatureTypeError(String),
29
30 #[error("invalid value detected for feature: {0}, error: {1}")]
31 InvalidValueError(String, String),
32
33 #[error("Failed to get bin given bin id")]
34 GetBinError,
35}
36
37impl From<FeatureQueueError> for PyErr {
38 fn from(err: FeatureQueueError) -> PyErr {
39 PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(err.to_string())
40 }
41}
42
43#[derive(Error, Debug)]
44pub enum EventError {
45 #[cfg(any(feature = "kafka", feature = "kafka-vendored"))]
46 #[error("Failed to connect to kakfa consumer")]
47 ConnectKafkaConsumerError(#[source] KafkaError),
48
49 #[cfg(any(feature = "kafka", feature = "kafka-vendored"))]
50 #[error("Failed to connect to kakfa producer")]
51 ConnectKafkaProducerError(#[source] KafkaError),
52
53 #[cfg(any(feature = "kafka", feature = "kafka-vendored"))]
54 #[error("Failed to subscribe to topic")]
55 SubscribeTopicError(#[source] KafkaError),
56
57 #[cfg(any(feature = "kafka", feature = "kafka-vendored"))]
58 #[error("Failed to flush kafka producer")]
59 FlushKafkaProducerError(#[source] KafkaError),
60
61 #[cfg(any(feature = "kafka", feature = "kafka-vendored"))]
62 #[error("Failed to create producer")]
63 CreateKafkaProducerError(#[source] KafkaError),
64
65 #[cfg(any(feature = "kafka", feature = "kafka-vendored"))]
66 #[error("Failed to publish message")]
67 PublishKafkaMessageError(#[source] KafkaError),
68
69 #[cfg(feature = "rabbitmq")]
70 #[error("Failed to connect to RabbitMQ")]
71 ConnectRabbitMQError(#[source] lapin::Error),
72
73 #[cfg(feature = "rabbitmq")]
74 #[error("Failed to setup RabbitMQ QoS")]
75 SetupRabbitMQQosError(#[source] lapin::Error),
76
77 #[cfg(feature = "rabbitmq")]
78 #[error("Failed to declare RabbitMQ queue")]
79 DeclareRabbitMQQueueError(#[source] lapin::Error),
80
81 #[cfg(feature = "rabbitmq")]
82 #[error("Failed to consume RabbitMQ queue")]
83 ConsumeRabbitMQError(#[source] lapin::Error),
84
85 #[cfg(feature = "rabbitmq")]
86 #[error("Failed to create RabbitMQ channel")]
87 CreateRabbitMQChannelError(#[source] lapin::Error),
88
89 #[cfg(feature = "rabbitmq")]
90 #[error("Failed to publish RabbitMQ message")]
91 PublishRabbitMQMessageError(#[source] lapin::Error),
92
93 #[cfg(feature = "rabbitmq")]
94 #[error("Failed to flush RabbitMQ channel")]
95 FlushRabbitMQChannelError(#[source] lapin::Error),
96
97 #[cfg(feature = "redis_events")]
98 #[error("Failed to connect to Redis")]
99 RedisError(#[source] redis::RedisError),
100
101 #[error(transparent)]
102 ReqwestError(#[from] reqwest::Error),
103
104 #[error(transparent)]
105 HeaderError(#[from] reqwest::header::InvalidHeaderValue),
106
107 #[error("Unauthorized")]
108 UnauthorizedError,
109
110 #[error(transparent)]
111 SerdeJsonError(#[from] serde_json::Error),
112
113 #[error(transparent)]
114 SendEntityError(#[from] tokio::sync::mpsc::error::SendError<Event>),
115
116 #[error("Failed to push to queue. Queue is full")]
117 QueuePushError,
118
119 #[error("Failed to push to queue. Max retries exceeded")]
120 QueuePushRetryError,
121
122 #[error("Queue not supported for feature entity")]
123 QueueNotSupportedFeatureError,
124
125 #[error("Queue not supported for metrics entity")]
126 QueueNotSupportedMetricsError,
127
128 #[error("Failed to signal startup")]
129 SignalStartupError,
130
131 #[error("Failed to signal startup")]
132 SignalCompletionError,
133
134 #[error("Failed to setup tokio runtime for ScouterQueue: {0}")]
135 SetupTokioRuntimeError(#[source] io::Error),
136
137 #[error("Failed to start receiver tokio runtime: {0}")]
138 StartupReceiverError(#[source] tokio::sync::oneshot::error::RecvError),
139
140 #[error("Failed to shutdown receiver tokio runtime: {0}")]
141 ShutdownReceiverError(#[source] tokio::sync::oneshot::error::RecvError),
142
143 #[error("Kafka feature not enabled")]
144 KafkaFeatureNotEnabledError,
145
146 #[error("RabbitMQ feature not enabled")]
147 RabbitMQFeatureNotEnabledError,
148
149 #[error("Redis feature not enabled")]
150 RedisFeatureNotEnabledError,
151
152 #[error("Invalid compressions type")]
153 InvalidCompressionTypeError,
154
155 #[error("Failed to initialize QueueBus")]
156 InitializationError,
157}
158
159#[derive(Error, Debug)]
160pub enum PyEventError {
161 #[error(transparent)]
162 EventError(#[from] EventError),
163
164 #[error(transparent)]
165 PyErr(#[from] pyo3::PyErr),
166
167 #[error("Invalid compressions type")]
168 InvalidCompressionTypeError,
169
170 #[error(transparent)]
171 TypeError(#[from] scouter_types::error::TypeError),
172
173 #[error(transparent)]
174 ProfileError(#[from] scouter_types::error::ProfileError),
175
176 #[error("Failed to get queue: {0}")]
177 MissingQueueError(String),
178
179 #[error("Transport config was not provided")]
180 MissingTransportConfig,
181
182 #[error("Failed to shutdown queue")]
183 ShutdownQueueError(#[source] pyo3::PyErr),
184
185 #[error("Failed to convert TransportConfig type to py object: {0}")]
186 ConvertToPyError(#[source] pyo3::PyErr),
187}
188impl From<PyEventError> for PyErr {
189 fn from(err: PyEventError) -> PyErr {
190 let msg = err.to_string();
191 pyo3::exceptions::PyRuntimeError::new_err(msg)
192 }
193}