fukurow_streaming/
config.rs

1//! # Streaming Configuration
2//!
3//! Configuration for streaming processors and connections
4
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7
8/// Streaming processor configuration
9#[derive(Debug, Clone, Serialize, Deserialize)]
10pub struct StreamingConfig {
11    /// Processor type
12    pub processor_type: ProcessorType,
13
14    /// Connection configuration
15    pub connection: ConnectionConfig,
16
17    /// Processing configuration
18    pub processing: ProcessingConfig,
19
20    /// Monitoring configuration
21    pub monitoring: MonitoringConfig,
22}
23
24/// Processor type
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub enum ProcessorType {
27    Kafka,
28    NATS,
29    Redis,
30    RabbitMQ,
31}
32
33/// Connection configuration
34#[derive(Debug, Clone, Serialize, Deserialize)]
35#[serde(tag = "type", content = "config")]
36pub enum ConnectionConfig {
37    /// Kafka connection
38    Kafka(KafkaConfig),
39
40    /// NATS connection
41    NATS(NATSConfig),
42
43    /// Redis connection
44    Redis(RedisConfig),
45
46    /// RabbitMQ connection
47    RabbitMQ(RabbitMQConfig),
48}
49
50/// Kafka configuration
51#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct KafkaConfig {
53    /// Bootstrap servers
54    pub bootstrap_servers: Vec<String>,
55
56    /// Group ID
57    pub group_id: String,
58
59    /// Topics to consume from
60    pub consume_topics: Vec<String>,
61
62    /// Topic to produce to
63    pub produce_topic: String,
64
65    /// Additional Kafka properties
66    pub properties: HashMap<String, String>,
67}
68
69/// NATS configuration
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct NATSConfig {
72    /// NATS server URLs
73    pub servers: Vec<String>,
74
75    /// Subject to subscribe to
76    pub subject: String,
77
78    /// Queue group for load balancing
79    pub queue_group: Option<String>,
80
81    /// Authentication credentials
82    pub credentials: Option<String>,
83}
84
85/// Redis configuration
86#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct RedisConfig {
88    /// Redis server URL
89    pub url: String,
90
91    /// Stream key to consume from
92    pub stream_key: String,
93
94    /// Consumer group name
95    pub consumer_group: String,
96
97    /// Consumer name
98    pub consumer_name: String,
99
100    /// Database number
101    pub database: Option<u8>,
102}
103
104/// RabbitMQ configuration
105#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct RabbitMQConfig {
107    /// RabbitMQ server URL
108    pub url: String,
109
110    /// Exchange name
111    pub exchange: String,
112
113    /// Routing key
114    pub routing_key: String,
115
116    /// Queue name
117    pub queue: String,
118
119    /// Exchange type
120    pub exchange_type: String,
121}
122
123/// Processing configuration
124#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct ProcessingConfig {
126    /// Batch size for processing
127    pub batch_size: usize,
128
129    /// Processing timeout in seconds
130    pub processing_timeout_seconds: u64,
131
132    /// Maximum concurrent processors
133    pub max_concurrent_processors: usize,
134
135    /// Buffer size for internal queues
136    pub buffer_size: usize,
137
138    /// Retry configuration
139    pub retry: RetryConfig,
140}
141
142/// Retry configuration
143#[derive(Debug, Clone, Serialize, Deserialize)]
144pub struct RetryConfig {
145    /// Maximum retry attempts
146    pub max_attempts: u32,
147
148    /// Initial backoff duration in milliseconds
149    pub initial_backoff_ms: u64,
150
151    /// Maximum backoff duration in milliseconds
152    pub max_backoff_ms: u64,
153
154    /// Backoff multiplier
155    pub backoff_multiplier: f64,
156}
157
158/// Monitoring configuration
159#[derive(Debug, Clone, Serialize, Deserialize)]
160pub struct MonitoringConfig {
161    /// Enable metrics collection
162    pub enable_metrics: bool,
163
164    /// Metrics prefix
165    pub metrics_prefix: String,
166
167    /// Enable health checks
168    pub enable_health_checks: bool,
169
170    /// Health check interval in seconds
171    pub health_check_interval_seconds: u64,
172}
173
174impl Default for StreamingConfig {
175    fn default() -> Self {
176        Self {
177            processor_type: ProcessorType::Kafka,
178            connection: ConnectionConfig::Kafka(KafkaConfig {
179                bootstrap_servers: vec!["localhost:9092".to_string()],
180                group_id: "fukurow-streaming".to_string(),
181                consume_topics: vec!["security-events".to_string()],
182                produce_topic: "reasoning-results".to_string(),
183                properties: HashMap::new(),
184            }),
185            processing: ProcessingConfig {
186                batch_size: 100,
187                processing_timeout_seconds: 30,
188                max_concurrent_processors: 10,
189                buffer_size: 1000,
190                retry: RetryConfig {
191                    max_attempts: 3,
192                    initial_backoff_ms: 100,
193                    max_backoff_ms: 10000,
194                    backoff_multiplier: 2.0,
195                },
196            },
197            monitoring: MonitoringConfig {
198                enable_metrics: true,
199                metrics_prefix: "fukurow_streaming".to_string(),
200                enable_health_checks: true,
201                health_check_interval_seconds: 30,
202            },
203        }
204    }
205}
206
207#[cfg(test)]
208mod tests {
209    use super::*;
210
211    #[test]
212    fn test_default_config() {
213        let config = StreamingConfig::default();
214
215        match &config.connection {
216            ConnectionConfig::Kafka(kafka_config) => {
217                assert_eq!(kafka_config.bootstrap_servers, vec!["localhost:9092".to_string()]);
218                assert_eq!(kafka_config.group_id, "fukurow-streaming");
219            }
220            _ => panic!("Expected Kafka config"),
221        }
222
223        assert_eq!(config.processing.batch_size, 100);
224        assert_eq!(config.monitoring.metrics_prefix, "fukurow_streaming");
225    }
226
227    #[test]
228    fn test_kafka_config_serialization() {
229        let kafka_config = KafkaConfig {
230            bootstrap_servers: vec!["kafka1:9092".to_string(), "kafka2:9092".to_string()],
231            group_id: "test-group".to_string(),
232            consume_topics: vec!["events".to_string()],
233            produce_topic: "results".to_string(),
234            properties: HashMap::from([
235                ("auto.offset.reset".to_string(), "earliest".to_string()),
236            ]),
237        };
238
239        let json = serde_json::to_string(&kafka_config).unwrap();
240        let deserialized: KafkaConfig = serde_json::from_str(&json).unwrap();
241
242        assert_eq!(deserialized.bootstrap_servers.len(), 2);
243        assert_eq!(deserialized.group_id, "test-group");
244    }
245
246    #[test]
247    fn test_retry_config() {
248        let retry = RetryConfig {
249            max_attempts: 5,
250            initial_backoff_ms: 200,
251            max_backoff_ms: 30000,
252            backoff_multiplier: 1.5,
253        };
254
255        assert_eq!(retry.max_attempts, 5);
256        assert_eq!(retry.backoff_multiplier, 1.5);
257    }
258}