scouter_events/
error.rs

1use futures::io;
2use pyo3::PyErr;
3
4#[cfg(any(feature = "kafka", feature = "kafka-vendored"))]
5use rdkafka::error::KafkaError;
6use thiserror::Error;
7
8use crate::queue::bus::Event;
9
10#[derive(Error, Debug)]
11pub enum FeatureQueueError {
12    #[error("{0}")]
13    InvalidFormatError(String),
14
15    #[error("Failed to create drift record: {0}")]
16    DriftRecordError(String),
17
18    #[error("Failed to create alert record: {0}")]
19    AlertRecordError(String),
20
21    #[error("Failed to get feature")]
22    GetFeatureError,
23
24    #[error("Missing feature map")]
25    MissingFeatureMapError,
26
27    #[error("invalid data type detected for feature: {0}")]
28    InvalidFeatureTypeError(String),
29
30    #[error("invalid value detected for feature: {0}, error: {1}")]
31    InvalidValueError(String, String),
32
33    #[error("Failed to get bin given bin id")]
34    GetBinError,
35}
36
37impl From<FeatureQueueError> for PyErr {
38    fn from(err: FeatureQueueError) -> PyErr {
39        PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(err.to_string())
40    }
41}
42
43#[derive(Error, Debug)]
44pub enum EventError {
45    #[cfg(any(feature = "kafka", feature = "kafka-vendored"))]
46    #[error("Failed to connect to kakfa consumer")]
47    ConnectKafkaConsumerError(#[source] KafkaError),
48
49    #[cfg(any(feature = "kafka", feature = "kafka-vendored"))]
50    #[error("Failed to connect to kakfa producer")]
51    ConnectKafkaProducerError(#[source] KafkaError),
52
53    #[cfg(any(feature = "kafka", feature = "kafka-vendored"))]
54    #[error("Failed to subscribe to topic")]
55    SubscribeTopicError(#[source] KafkaError),
56
57    #[cfg(any(feature = "kafka", feature = "kafka-vendored"))]
58    #[error("Failed to flush kafka producer")]
59    FlushKafkaProducerError(#[source] KafkaError),
60
61    #[cfg(any(feature = "kafka", feature = "kafka-vendored"))]
62    #[error("Failed to create producer")]
63    CreateKafkaProducerError(#[source] KafkaError),
64
65    #[cfg(any(feature = "kafka", feature = "kafka-vendored"))]
66    #[error("Failed to publish message")]
67    PublishKafkaMessageError(#[source] KafkaError),
68
69    #[cfg(feature = "rabbitmq")]
70    #[error("Failed to connect to RabbitMQ")]
71    ConnectRabbitMQError(#[source] lapin::Error),
72
73    #[cfg(feature = "rabbitmq")]
74    #[error("Failed to setup RabbitMQ QoS")]
75    SetupRabbitMQQosError(#[source] lapin::Error),
76
77    #[cfg(feature = "rabbitmq")]
78    #[error("Failed to declare RabbitMQ queue")]
79    DeclareRabbitMQQueueError(#[source] lapin::Error),
80
81    #[cfg(feature = "rabbitmq")]
82    #[error("Failed to consume RabbitMQ queue")]
83    ConsumeRabbitMQError(#[source] lapin::Error),
84
85    #[cfg(feature = "rabbitmq")]
86    #[error("Failed to create RabbitMQ channel")]
87    CreateRabbitMQChannelError(#[source] lapin::Error),
88
89    #[cfg(feature = "rabbitmq")]
90    #[error("Failed to publish RabbitMQ message")]
91    PublishRabbitMQMessageError(#[source] lapin::Error),
92
93    #[cfg(feature = "rabbitmq")]
94    #[error("Failed to flush RabbitMQ channel")]
95    FlushRabbitMQChannelError(#[source] lapin::Error),
96
97    #[cfg(feature = "redis_events")]
98    #[error("Failed to connect to Redis")]
99    RedisError(#[source] redis::RedisError),
100
101    #[error(transparent)]
102    ReqwestError(#[from] reqwest::Error),
103
104    #[error(transparent)]
105    HeaderError(#[from] reqwest::header::InvalidHeaderValue),
106
107    #[error("Unauthorized")]
108    UnauthorizedError,
109
110    #[error(transparent)]
111    SerdeJsonError(#[from] serde_json::Error),
112
113    #[error(transparent)]
114    SendEntityError(#[from] tokio::sync::mpsc::error::SendError<Event>),
115
116    #[error("Failed to push to queue. Queue is full")]
117    QueuePushError,
118
119    #[error("Failed to push to queue. Max retries exceeded")]
120    QueuePushRetryError,
121
122    #[error("Queue not supported for feature entity")]
123    QueueNotSupportedFeatureError,
124
125    #[error("Queue not supported for metrics entity")]
126    QueueNotSupportedMetricsError,
127
128    #[error("Queue not supported for LLM entity")]
129    QueueNotSupportedLLMError,
130
131    #[error("Failed to signal startup")]
132    SignalStartupError,
133
134    #[error("Failed to signal startup")]
135    SignalCompletionError,
136
137    #[error("Failed to setup tokio runtime for ScouterQueue: {0}")]
138    SetupTokioRuntimeError(#[source] io::Error),
139
140    #[error("Failed to start receiver tokio runtime: {0}")]
141    StartupReceiverError(#[source] tokio::sync::oneshot::error::RecvError),
142
143    #[error("Failed to shutdown receiver tokio runtime: {0}")]
144    ShutdownReceiverError(#[source] tokio::sync::oneshot::error::RecvError),
145
146    #[error("Kafka feature not enabled")]
147    KafkaFeatureNotEnabledError,
148
149    #[error("RabbitMQ feature not enabled")]
150    RabbitMQFeatureNotEnabledError,
151
152    #[error("Redis feature not enabled")]
153    RedisFeatureNotEnabledError,
154
155    #[error("Invalid compressions type")]
156    InvalidCompressionTypeError,
157
158    #[error("Failed to initialize QueueBus")]
159    InitializationError,
160
161    #[error(transparent)]
162    JoinError(#[from] tokio::task::JoinError),
163
164    #[error("Event task failed to start")]
165    EventTaskFailedToStartError,
166
167    #[error("Background task failed to start")]
168    BackgroundTaskFailedToStartError,
169
170    #[error("Event task read error")]
171    EventTaskReadError,
172
173    #[error("Missing background tx channel")]
174    BackgroundTxMissingError,
175
176    #[error("Missing event tx channel")]
177    EventTxMissingError,
178
179    #[error("Failed to acquire read lock: {0}")]
180    ReadLockError(String),
181}
182
183#[derive(Error, Debug)]
184pub enum PyEventError {
185    #[error(transparent)]
186    EventError(#[from] EventError),
187
188    #[error(transparent)]
189    PyErr(#[from] pyo3::PyErr),
190
191    #[error("Invalid compressions type")]
192    InvalidCompressionTypeError,
193
194    #[error(transparent)]
195    TypeError(#[from] scouter_types::error::TypeError),
196
197    #[error(transparent)]
198    ProfileError(#[from] scouter_types::error::ProfileError),
199
200    #[error("Failed to get queue: {0}")]
201    MissingQueueError(String),
202
203    #[error("Transport config was not provided")]
204    MissingTransportConfig,
205
206    #[error("Failed to shutdown queue")]
207    ShutdownQueueError(#[source] pyo3::PyErr),
208
209    #[error("Failed to convert TransportConfig type to py object: {0}")]
210    ConvertToPyError(#[source] pyo3::PyErr),
211
212    #[error("Failed to clear all queues. Pending events exist")]
213    PendingEventsError,
214}
215impl From<PyEventError> for PyErr {
216    fn from(err: PyEventError) -> PyErr {
217        let msg = err.to_string();
218        pyo3::exceptions::PyRuntimeError::new_err(msg)
219    }
220}