scouter-events 0.21.0

Logic for setting up and running Scouter event-driven consumers and producers
use futures::io;
use pyo3::PyErr;

#[cfg(any(feature = "kafka", feature = "kafka-vendored"))]
use rdkafka::error::KafkaError;
use thiserror::Error;

use crate::queue::bus::Event;

#[derive(Error, Debug)]
pub enum FeatureQueueError {
    #[error("{0}")]
    InvalidFormatError(String),

    #[error("Failed to create drift record: {0}")]
    DriftRecordError(String),

    #[error("Failed to create alert record: {0}")]
    AlertRecordError(String),

    #[error("Failed to get feature")]
    GetFeatureError,

    #[error("Missing feature map")]
    MissingFeatureMapError,

    #[error("invalid data type detected for feature: {0}")]
    InvalidFeatureTypeError(String),

    #[error("invalid value detected for feature: {0}, error: {1}")]
    InvalidValueError(String, String),

    #[error("Failed to get bin given bin id")]
    GetBinError,
}

impl From<FeatureQueueError> for PyErr {
    fn from(err: FeatureQueueError) -> PyErr {
        PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(err.to_string())
    }
}

#[derive(Error, Debug)]
pub enum EventError {
    #[cfg(any(feature = "kafka", feature = "kafka-vendored"))]
    #[error("Failed to connect to kakfa consumer")]
    ConnectKafkaConsumerError(#[source] KafkaError),

    #[cfg(any(feature = "kafka", feature = "kafka-vendored"))]
    #[error("Failed to connect to kakfa producer")]
    ConnectKafkaProducerError(#[source] KafkaError),

    #[cfg(any(feature = "kafka", feature = "kafka-vendored"))]
    #[error("Failed to subscribe to topic")]
    SubscribeTopicError(#[source] KafkaError),

    #[cfg(any(feature = "kafka", feature = "kafka-vendored"))]
    #[error("Failed to flush kafka producer")]
    FlushKafkaProducerError(#[source] KafkaError),

    #[cfg(any(feature = "kafka", feature = "kafka-vendored"))]
    #[error("Failed to create producer")]
    CreateKafkaProducerError(#[source] KafkaError),

    #[cfg(any(feature = "kafka", feature = "kafka-vendored"))]
    #[error("Failed to publish message")]
    PublishKafkaMessageError(#[source] KafkaError),

    #[cfg(feature = "rabbitmq")]
    #[error("Failed to connect to RabbitMQ")]
    ConnectRabbitMQError(#[source] lapin::Error),

    #[cfg(feature = "rabbitmq")]
    #[error("Failed to setup RabbitMQ QoS")]
    SetupRabbitMQQosError(#[source] lapin::Error),

    #[cfg(feature = "rabbitmq")]
    #[error("Failed to declare RabbitMQ queue")]
    DeclareRabbitMQQueueError(#[source] lapin::Error),

    #[cfg(feature = "rabbitmq")]
    #[error("Failed to consume RabbitMQ queue")]
    ConsumeRabbitMQError(#[source] lapin::Error),

    #[cfg(feature = "rabbitmq")]
    #[error("Failed to create RabbitMQ channel")]
    CreateRabbitMQChannelError(#[source] lapin::Error),

    #[cfg(feature = "rabbitmq")]
    #[error("Failed to publish RabbitMQ message")]
    PublishRabbitMQMessageError(#[source] lapin::Error),

    #[cfg(feature = "rabbitmq")]
    #[error("Failed to flush RabbitMQ channel")]
    FlushRabbitMQChannelError(#[source] lapin::Error),

    #[cfg(feature = "redis_events")]
    #[error("Failed to connect to Redis")]
    RedisError(#[source] redis::RedisError),

    #[error(transparent)]
    ReqwestError(#[from] reqwest::Error),

    #[error(transparent)]
    HeaderError(#[from] reqwest::header::InvalidHeaderValue),

    #[error("Unauthorized")]
    UnauthorizedError,

    #[error(transparent)]
    SerdeJsonError(#[from] serde_json::Error),

    #[error(transparent)]
    SendEntityError(#[from] tokio::sync::mpsc::error::SendError<Event>),

    #[error("Failed to push to queue. Queue is full")]
    QueuePushError,

    #[error("Failed to push to queue. Max retries exceeded")]
    QueuePushRetryError,

    #[error("Queue not supported for feature entity")]
    QueueNotSupportedFeatureError,

    #[error("Queue not supported for metrics entity")]
    QueueNotSupportedMetricsError,

    #[error("Queue not supported for LLM entity")]
    QueueNotSupportedLLMError,

    #[error("Failed to signal startup")]
    SignalStartupError,

    #[error("Failed to signal startup")]
    SignalCompletionError,

    #[error("Failed to setup tokio runtime for ScouterQueue: {0}")]
    SetupTokioRuntimeError(#[source] io::Error),

    #[error("Failed to start receiver tokio runtime: {0}")]
    StartupReceiverError(#[source] tokio::sync::oneshot::error::RecvError),

    #[error("Failed to shutdown receiver tokio runtime: {0}")]
    ShutdownReceiverError(#[source] tokio::sync::oneshot::error::RecvError),

    #[error("Kafka feature not enabled")]
    KafkaFeatureNotEnabledError,

    #[error("RabbitMQ feature not enabled")]
    RabbitMQFeatureNotEnabledError,

    #[error("Redis feature not enabled")]
    RedisFeatureNotEnabledError,

    #[error("Invalid compressions type")]
    InvalidCompressionTypeError,

    #[error("Failed to initialize QueueBus")]
    InitializationError,

    #[error(transparent)]
    JoinError(#[from] tokio::task::JoinError),

    #[error("Event task failed to start")]
    EventTaskFailedToStartError,

    #[error("Background task failed to start")]
    BackgroundTaskFailedToStartError,

    #[error("Event task read error")]
    EventTaskReadError,

    #[error("Missing background tx channel")]
    BackgroundTxMissingError,

    #[error("Missing event tx channel")]
    EventTxMissingError,

    #[error("Failed to acquire read lock: {0}")]
    ReadLockError(String),

    #[error("Poison error occurred")]
    PoisonError(String),

    #[error(transparent)]
    GrpcClientError(#[from] scouter_tonic::error::ClientError),

    #[error("Failed to get queue settings for GenAI queue")]
    MissingQueueSettingsError,
}

#[derive(Error, Debug)]
pub enum PyEventError {
    #[error(transparent)]
    EventError(#[from] EventError),

    #[error("{0}")]
    Error(String),

    #[error(transparent)]
    TypeError(#[from] scouter_types::error::TypeError),

    #[error(transparent)]
    ProfileError(#[from] scouter_types::error::ProfileError),

    #[error("Failed to get queue: {0}")]
    MissingQueueError(String),

    #[error("Transport config was not provided")]
    MissingTransportConfig,

    #[error("Failed to shutdown queue")]
    ShutdownQueueError(#[source] pyo3::PyErr),

    #[error("Failed to convert TransportConfig type to py object: {0}")]
    ConvertToPyError(#[source] pyo3::PyErr),

    #[error("Failed to clear all queues. Pending events exist")]
    PendingEventsError,

    #[error("Drift profile alias must be set")]
    DriftProfileAliasMustBeSet,

    #[error("Invalid drift profile format. Expected Dict[str, DriftProfile], List[DriftProfile], or single DriftProfile")]
    InvalidDriftProfileFormat,
}
impl From<PyEventError> for PyErr {
    fn from(err: PyEventError) -> PyErr {
        let msg = err.to_string();
        pyo3::exceptions::PyRuntimeError::new_err(msg)
    }
}

impl<'a, 'py> From<pyo3::CastError<'a, 'py>> for PyEventError {
    fn from(err: pyo3::CastError<'a, 'py>) -> Self {
        PyEventError::Error(err.to_string())
    }
}

impl From<PyErr> for PyEventError {
    fn from(err: PyErr) -> Self {
        PyEventError::Error(err.to_string())
    }
}