scouter_settings/
events.rs

1use scouter_types::PyHelperFuncs;
2use serde::Serialize;
3
4// see: https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md
5#[derive(Clone, Serialize)]
6pub struct KafkaSettings {
7    pub brokers: String,
8    pub num_workers: usize,
9    pub topics: Vec<String>,
10    pub group_id: String,
11    pub username: Option<String>,
12    pub password: Option<String>,
13    pub security_protocol: String,
14    pub sasl_mechanism: String,
15    pub offset_reset: String,
16    pub cert_location: Option<String>,
17}
18
19impl KafkaSettings {
20    pub fn __str__(&self) -> String {
21        // serialize the struct to a string
22        PyHelperFuncs::__str__(self)
23    }
24}
25
26impl std::fmt::Debug for KafkaSettings {
27    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28        f.debug_struct("KafkaSettings")
29            .field("brokers", &self.brokers)
30            .field("num_workers", &self.num_workers)
31            .field("topics", &self.topics)
32            .field("group_id", &self.group_id)
33            .field("username", &self.username)
34            .field("password", &self.password.as_ref().map(|_| "***"))
35            .field("security_protocol", &self.security_protocol)
36            .field("offset_reset", &self.offset_reset)
37            .field("sasl_mechanism", &self.sasl_mechanism)
38            .finish()
39    }
40}
41
42impl Default for KafkaSettings {
43    fn default() -> Self {
44        let brokers =
45            std::env::var("KAFKA_BROKERS").unwrap_or_else(|_| "localhost:9092".to_string());
46
47        let num_workers = std::env::var("KAFKA_WORKER_COUNT")
48            .unwrap_or_else(|_| "3".to_string())
49            .parse::<usize>()
50            .unwrap();
51
52        let topics = std::env::var("KAFKA_TOPIC")
53            .unwrap_or_else(|_| "scouter_monitoring".to_string())
54            .split(',')
55            .map(|s| s.to_string())
56            .collect();
57
58        let group_id = std::env::var("KAFKA_GROUP").unwrap_or("scouter".to_string());
59        let offset_reset = std::env::var("KAFKA_OFFSET_RESET")
60            .unwrap_or_else(|_| "earliest".to_string())
61            .to_string();
62        let username: Option<String> = std::env::var("KAFKA_USERNAME").ok();
63        let password: Option<String> = std::env::var("KAFKA_PASSWORD").ok();
64
65        let security_protocol = std::env::var("KAFKA_SECURITY_PROTOCOL")
66            .ok()
67            .unwrap_or_else(|| "SASL_SSL".to_string());
68        let sasl_mechanism = std::env::var("KAFKA_SASL_MECHANISM")
69            .ok()
70            .unwrap_or_else(|| "PLAIN".to_string());
71        let cert_location = std::env::var("KAFKA_CERT_LOCATION").ok();
72
73        Self {
74            brokers,
75            num_workers,
76            topics,
77            group_id,
78            username,
79            password,
80            security_protocol,
81            sasl_mechanism,
82            offset_reset,
83            cert_location,
84        }
85    }
86}
87
88#[derive(Debug, Clone, Serialize)]
89pub struct RabbitMQSettings {
90    pub num_consumers: usize,
91    pub prefetch_count: u16,
92    pub queue: String,
93    pub consumer_tag: String,
94    pub address: String,
95}
96
97impl RabbitMQSettings {
98    pub fn __str__(&self) -> String {
99        // serialize the struct to a string
100        PyHelperFuncs::__str__(self)
101    }
102}
103
104impl Default for RabbitMQSettings {
105    fn default() -> Self {
106        let num_consumers = std::env::var("RABBITMQ_CONSUMER_COUNT")
107            .unwrap_or_else(|_| "3".to_string())
108            .parse::<usize>()
109            .unwrap();
110
111        let prefetch_count = std::env::var("RABBITMQ_PREFETCH_COUNT")
112            .unwrap_or_else(|_| "10".to_string())
113            .parse::<u16>()
114            .unwrap();
115
116        let address = std::env::var("RABBITMQ_ADDR")
117            .unwrap_or_else(|_| "amqp://guest:guest@127.0.0.1:5672/%2f".to_string());
118
119        let queue =
120            std::env::var("RABBITMQ_QUEUE").unwrap_or_else(|_| "scouter_monitoring".to_string());
121
122        let consumer_tag =
123            std::env::var("RABBITMQ_CONSUMER_TAG").unwrap_or_else(|_| "scouter".to_string());
124
125        Self {
126            num_consumers,
127            prefetch_count,
128            queue,
129            consumer_tag,
130            address,
131        }
132    }
133}
134
135#[derive(Debug, Clone, Serialize)]
136pub struct RedisSettings {
137    pub num_consumers: usize,
138    pub channel: String,
139    pub address: String,
140}
141
142impl RedisSettings {
143    pub fn __str__(&self) -> String {
144        // serialize the struct to a string
145        PyHelperFuncs::__str__(self)
146    }
147}
148
149impl Default for RedisSettings {
150    fn default() -> Self {
151        let num_consumers = std::env::var("REDIS_CONSUMER_COUNT")
152            .unwrap_or_else(|_| "3".to_string())
153            .parse::<usize>()
154            .unwrap();
155        let channel =
156            std::env::var("REDIS_CHANNEL").unwrap_or_else(|_| "scouter_monitoring".to_string());
157
158        let address =
159            std::env::var("REDIS_ADDR").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
160
161        Self {
162            num_consumers,
163            channel,
164            address,
165        }
166    }
167}
168
169#[derive(Debug, Clone)]
170pub struct HttpConsumerSettings {
171    pub num_workers: usize,
172}
173impl Default for HttpConsumerSettings {
174    fn default() -> Self {
175        let num_workers = std::env::var("HTTP_CONSUMER_WORKER_COUNT")
176            .unwrap_or_else(|_| "1".to_string())
177            .parse::<usize>()
178            .unwrap();
179
180        Self { num_workers }
181    }
182}