1use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7
8#[derive(Debug, Clone, Serialize, Deserialize)]
10pub struct StreamingConfig {
11 pub processor_type: ProcessorType,
13
14 pub connection: ConnectionConfig,
16
17 pub processing: ProcessingConfig,
19
20 pub monitoring: MonitoringConfig,
22}
23
24#[derive(Debug, Clone, Serialize, Deserialize)]
26pub enum ProcessorType {
27 Kafka,
28 NATS,
29 Redis,
30 RabbitMQ,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
35#[serde(tag = "type", content = "config")]
36pub enum ConnectionConfig {
37 Kafka(KafkaConfig),
39
40 NATS(NATSConfig),
42
43 Redis(RedisConfig),
45
46 RabbitMQ(RabbitMQConfig),
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct KafkaConfig {
53 pub bootstrap_servers: Vec<String>,
55
56 pub group_id: String,
58
59 pub consume_topics: Vec<String>,
61
62 pub produce_topic: String,
64
65 pub properties: HashMap<String, String>,
67}
68
69#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct NATSConfig {
72 pub servers: Vec<String>,
74
75 pub subject: String,
77
78 pub queue_group: Option<String>,
80
81 pub credentials: Option<String>,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct RedisConfig {
88 pub url: String,
90
91 pub stream_key: String,
93
94 pub consumer_group: String,
96
97 pub consumer_name: String,
99
100 pub database: Option<u8>,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct RabbitMQConfig {
107 pub url: String,
109
110 pub exchange: String,
112
113 pub routing_key: String,
115
116 pub queue: String,
118
119 pub exchange_type: String,
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct ProcessingConfig {
126 pub batch_size: usize,
128
129 pub processing_timeout_seconds: u64,
131
132 pub max_concurrent_processors: usize,
134
135 pub buffer_size: usize,
137
138 pub retry: RetryConfig,
140}
141
142#[derive(Debug, Clone, Serialize, Deserialize)]
144pub struct RetryConfig {
145 pub max_attempts: u32,
147
148 pub initial_backoff_ms: u64,
150
151 pub max_backoff_ms: u64,
153
154 pub backoff_multiplier: f64,
156}
157
158#[derive(Debug, Clone, Serialize, Deserialize)]
160pub struct MonitoringConfig {
161 pub enable_metrics: bool,
163
164 pub metrics_prefix: String,
166
167 pub enable_health_checks: bool,
169
170 pub health_check_interval_seconds: u64,
172}
173
174impl Default for StreamingConfig {
175 fn default() -> Self {
176 Self {
177 processor_type: ProcessorType::Kafka,
178 connection: ConnectionConfig::Kafka(KafkaConfig {
179 bootstrap_servers: vec!["localhost:9092".to_string()],
180 group_id: "fukurow-streaming".to_string(),
181 consume_topics: vec!["security-events".to_string()],
182 produce_topic: "reasoning-results".to_string(),
183 properties: HashMap::new(),
184 }),
185 processing: ProcessingConfig {
186 batch_size: 100,
187 processing_timeout_seconds: 30,
188 max_concurrent_processors: 10,
189 buffer_size: 1000,
190 retry: RetryConfig {
191 max_attempts: 3,
192 initial_backoff_ms: 100,
193 max_backoff_ms: 10000,
194 backoff_multiplier: 2.0,
195 },
196 },
197 monitoring: MonitoringConfig {
198 enable_metrics: true,
199 metrics_prefix: "fukurow_streaming".to_string(),
200 enable_health_checks: true,
201 health_check_interval_seconds: 30,
202 },
203 }
204 }
205}
206
207#[cfg(test)]
208mod tests {
209 use super::*;
210
211 #[test]
212 fn test_default_config() {
213 let config = StreamingConfig::default();
214
215 match &config.connection {
216 ConnectionConfig::Kafka(kafka_config) => {
217 assert_eq!(kafka_config.bootstrap_servers, vec!["localhost:9092".to_string()]);
218 assert_eq!(kafka_config.group_id, "fukurow-streaming");
219 }
220 _ => panic!("Expected Kafka config"),
221 }
222
223 assert_eq!(config.processing.batch_size, 100);
224 assert_eq!(config.monitoring.metrics_prefix, "fukurow_streaming");
225 }
226
227 #[test]
228 fn test_kafka_config_serialization() {
229 let kafka_config = KafkaConfig {
230 bootstrap_servers: vec!["kafka1:9092".to_string(), "kafka2:9092".to_string()],
231 group_id: "test-group".to_string(),
232 consume_topics: vec!["events".to_string()],
233 produce_topic: "results".to_string(),
234 properties: HashMap::from([
235 ("auto.offset.reset".to_string(), "earliest".to_string()),
236 ]),
237 };
238
239 let json = serde_json::to_string(&kafka_config).unwrap();
240 let deserialized: KafkaConfig = serde_json::from_str(&json).unwrap();
241
242 assert_eq!(deserialized.bootstrap_servers.len(), 2);
243 assert_eq!(deserialized.group_id, "test-group");
244 }
245
246 #[test]
247 fn test_retry_config() {
248 let retry = RetryConfig {
249 max_attempts: 5,
250 initial_backoff_ms: 200,
251 max_backoff_ms: 30000,
252 backoff_multiplier: 1.5,
253 };
254
255 assert_eq!(retry.max_attempts, 5);
256 assert_eq!(retry.backoff_multiplier, 1.5);
257 }
258}