1use futures::io;
2use pyo3::PyErr;
3
4#[cfg(any(feature = "kafka", feature = "kafka-vendored"))]
5use rdkafka::error::KafkaError;
6use thiserror::Error;
7
8use crate::queue::bus::Event;
9
10#[derive(Error, Debug)]
11pub enum FeatureQueueError {
12 #[error("{0}")]
13 InvalidFormatError(String),
14
15 #[error("Failed to create drift record: {0}")]
16 DriftRecordError(String),
17
18 #[error("Failed to create alert record: {0}")]
19 AlertRecordError(String),
20
21 #[error("Failed to get feature")]
22 GetFeatureError,
23
24 #[error("Missing feature map")]
25 MissingFeatureMapError,
26
27 #[error("invalid data type detected for feature: {0}")]
28 InvalidFeatureTypeError(String),
29
30 #[error("invalid value detected for feature: {0}, error: {1}")]
31 InvalidValueError(String, String),
32
33 #[error("Failed to get bin given bin id")]
34 GetBinError,
35}
36
37impl From<FeatureQueueError> for PyErr {
38 fn from(err: FeatureQueueError) -> PyErr {
39 PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(err.to_string())
40 }
41}
42
43#[derive(Error, Debug)]
44pub enum EventError {
45 #[cfg(any(feature = "kafka", feature = "kafka-vendored"))]
46 #[error("Failed to connect to kakfa consumer")]
47 ConnectKafkaConsumerError(#[source] KafkaError),
48
49 #[cfg(any(feature = "kafka", feature = "kafka-vendored"))]
50 #[error("Failed to connect to kakfa producer")]
51 ConnectKafkaProducerError(#[source] KafkaError),
52
53 #[cfg(any(feature = "kafka", feature = "kafka-vendored"))]
54 #[error("Failed to subscribe to topic")]
55 SubscribeTopicError(#[source] KafkaError),
56
57 #[cfg(any(feature = "kafka", feature = "kafka-vendored"))]
58 #[error("Failed to flush kafka producer")]
59 FlushKafkaProducerError(#[source] KafkaError),
60
61 #[cfg(any(feature = "kafka", feature = "kafka-vendored"))]
62 #[error("Failed to create producer")]
63 CreateKafkaProducerError(#[source] KafkaError),
64
65 #[cfg(any(feature = "kafka", feature = "kafka-vendored"))]
66 #[error("Failed to publish message")]
67 PublishKafkaMessageError(#[source] KafkaError),
68
69 #[cfg(feature = "rabbitmq")]
70 #[error("Failed to connect to RabbitMQ")]
71 ConnectRabbitMQError(#[source] lapin::Error),
72
73 #[cfg(feature = "rabbitmq")]
74 #[error("Failed to setup RabbitMQ QoS")]
75 SetupRabbitMQQosError(#[source] lapin::Error),
76
77 #[cfg(feature = "rabbitmq")]
78 #[error("Failed to declare RabbitMQ queue")]
79 DeclareRabbitMQQueueError(#[source] lapin::Error),
80
81 #[cfg(feature = "rabbitmq")]
82 #[error("Failed to consume RabbitMQ queue")]
83 ConsumeRabbitMQError(#[source] lapin::Error),
84
85 #[cfg(feature = "rabbitmq")]
86 #[error("Failed to create RabbitMQ channel")]
87 CreateRabbitMQChannelError(#[source] lapin::Error),
88
89 #[cfg(feature = "rabbitmq")]
90 #[error("Failed to publish RabbitMQ message")]
91 PublishRabbitMQMessageError(#[source] lapin::Error),
92
93 #[cfg(feature = "rabbitmq")]
94 #[error("Failed to flush RabbitMQ channel")]
95 FlushRabbitMQChannelError(#[source] lapin::Error),
96
97 #[cfg(feature = "redis_events")]
98 #[error("Failed to connect to Redis")]
99 RedisError(#[source] redis::RedisError),
100
101 #[error(transparent)]
102 ReqwestError(#[from] reqwest::Error),
103
104 #[error(transparent)]
105 HeaderError(#[from] reqwest::header::InvalidHeaderValue),
106
107 #[error("Unauthorized")]
108 UnauthorizedError,
109
110 #[error(transparent)]
111 SerdeJsonError(#[from] serde_json::Error),
112
113 #[error(transparent)]
114 SendEntityError(#[from] tokio::sync::mpsc::error::SendError<Event>),
115
116 #[error("Failed to push to queue. Queue is full")]
117 QueuePushError,
118
119 #[error("Failed to push to queue. Max retries exceeded")]
120 QueuePushRetryError,
121
122 #[error("Queue not supported for feature entity")]
123 QueueNotSupportedFeatureError,
124
125 #[error("Queue not supported for metrics entity")]
126 QueueNotSupportedMetricsError,
127
128 #[error("Queue not supported for LLM entity")]
129 QueueNotSupportedLLMError,
130
131 #[error("Failed to signal startup")]
132 SignalStartupError,
133
134 #[error("Failed to signal startup")]
135 SignalCompletionError,
136
137 #[error("Failed to setup tokio runtime for ScouterQueue: {0}")]
138 SetupTokioRuntimeError(#[source] io::Error),
139
140 #[error("Failed to start receiver tokio runtime: {0}")]
141 StartupReceiverError(#[source] tokio::sync::oneshot::error::RecvError),
142
143 #[error("Failed to shutdown receiver tokio runtime: {0}")]
144 ShutdownReceiverError(#[source] tokio::sync::oneshot::error::RecvError),
145
146 #[error("Kafka feature not enabled")]
147 KafkaFeatureNotEnabledError,
148
149 #[error("RabbitMQ feature not enabled")]
150 RabbitMQFeatureNotEnabledError,
151
152 #[error("Redis feature not enabled")]
153 RedisFeatureNotEnabledError,
154
155 #[error("Invalid compressions type")]
156 InvalidCompressionTypeError,
157
158 #[error("Failed to initialize QueueBus")]
159 InitializationError,
160
161 #[error(transparent)]
162 JoinError(#[from] tokio::task::JoinError),
163
164 #[error("Event task failed to start")]
165 EventTaskFailedToStartError,
166
167 #[error("Background task failed to start")]
168 BackgroundTaskFailedToStartError,
169
170 #[error("Event task read error")]
171 EventTaskReadError,
172
173 #[error("Missing background tx channel")]
174 BackgroundTxMissingError,
175
176 #[error("Missing event tx channel")]
177 EventTxMissingError,
178
179 #[error("Failed to acquire read lock: {0}")]
180 ReadLockError(String),
181}
182
183#[derive(Error, Debug)]
184pub enum PyEventError {
185 #[error(transparent)]
186 EventError(#[from] EventError),
187
188 #[error(transparent)]
189 PyErr(#[from] pyo3::PyErr),
190
191 #[error("Invalid compressions type")]
192 InvalidCompressionTypeError,
193
194 #[error(transparent)]
195 TypeError(#[from] scouter_types::error::TypeError),
196
197 #[error(transparent)]
198 ProfileError(#[from] scouter_types::error::ProfileError),
199
200 #[error("Failed to get queue: {0}")]
201 MissingQueueError(String),
202
203 #[error("Transport config was not provided")]
204 MissingTransportConfig,
205
206 #[error("Failed to shutdown queue")]
207 ShutdownQueueError(#[source] pyo3::PyErr),
208
209 #[error("Failed to convert TransportConfig type to py object: {0}")]
210 ConvertToPyError(#[source] pyo3::PyErr),
211
212 #[error("Failed to clear all queues. Pending events exist")]
213 PendingEventsError,
214}
215impl From<PyEventError> for PyErr {
216 fn from(err: PyEventError) -> PyErr {
217 let msg = err.to_string();
218 pyo3::exceptions::PyRuntimeError::new_err(msg)
219 }
220}