drasi_source_http/
config.rs1use serde::{Deserialize, Serialize};
20
21#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
27pub struct HttpSourceConfig {
28 pub host: String,
30
31 pub port: u16,
33
34 #[serde(default, skip_serializing_if = "Option::is_none")]
36 pub endpoint: Option<String>,
37
38 #[serde(default = "default_timeout_ms")]
40 pub timeout_ms: u64,
41
42 #[serde(default, skip_serializing_if = "Option::is_none")]
44 pub adaptive_max_batch_size: Option<usize>,
45
46 #[serde(default, skip_serializing_if = "Option::is_none")]
48 pub adaptive_min_batch_size: Option<usize>,
49
50 #[serde(default, skip_serializing_if = "Option::is_none")]
52 pub adaptive_max_wait_ms: Option<u64>,
53
54 #[serde(default, skip_serializing_if = "Option::is_none")]
56 pub adaptive_min_wait_ms: Option<u64>,
57
58 #[serde(default, skip_serializing_if = "Option::is_none")]
60 pub adaptive_window_secs: Option<u64>,
61
62 #[serde(default, skip_serializing_if = "Option::is_none")]
64 pub adaptive_enabled: Option<bool>,
65}
66
67fn default_timeout_ms() -> u64 {
68 10000
69}
70
71impl HttpSourceConfig {
72 pub fn validate(&self) -> anyhow::Result<()> {
81 if self.port == 0 {
82 return Err(anyhow::anyhow!(
83 "Validation error: port cannot be 0. \
84 Please specify a valid port number (1-65535)"
85 ));
86 }
87
88 if self.timeout_ms == 0 {
89 return Err(anyhow::anyhow!(
90 "Validation error: timeout_ms cannot be 0. \
91 Please specify a positive timeout value in milliseconds"
92 ));
93 }
94
95 if let (Some(min), Some(max)) = (self.adaptive_min_batch_size, self.adaptive_max_batch_size)
97 {
98 if min > max {
99 return Err(anyhow::anyhow!(
100 "Validation error: adaptive_min_batch_size ({min}) cannot be greater than \
101 adaptive_max_batch_size ({max})"
102 ));
103 }
104 }
105
106 if let (Some(min), Some(max)) = (self.adaptive_min_wait_ms, self.adaptive_max_wait_ms) {
107 if min > max {
108 return Err(anyhow::anyhow!(
109 "Validation error: adaptive_min_wait_ms ({min}) cannot be greater than \
110 adaptive_max_wait_ms ({max})"
111 ));
112 }
113 }
114
115 Ok(())
116 }
117}
118
119#[cfg(test)]
120mod tests {
121 use super::*;
122
123 #[test]
124 fn test_config_deserialization_minimal() {
125 let yaml = r#"
126host: "localhost"
127port: 8080
128"#;
129 let config: HttpSourceConfig = serde_yaml::from_str(yaml).unwrap();
130 assert_eq!(config.host, "localhost");
131 assert_eq!(config.port, 8080);
132 assert_eq!(config.endpoint, None);
133 assert_eq!(config.timeout_ms, 10000); assert_eq!(config.adaptive_enabled, None);
135 }
136
137 #[test]
138 fn test_config_deserialization_full() {
139 let yaml = r#"
140host: "0.0.0.0"
141port: 9000
142endpoint: "/events"
143timeout_ms: 5000
144adaptive_max_batch_size: 1000
145adaptive_min_batch_size: 10
146adaptive_max_wait_ms: 500
147adaptive_min_wait_ms: 10
148adaptive_window_secs: 60
149adaptive_enabled: true
150"#;
151 let config: HttpSourceConfig = serde_yaml::from_str(yaml).unwrap();
152 assert_eq!(config.host, "0.0.0.0");
153 assert_eq!(config.port, 9000);
154 assert_eq!(config.endpoint, Some("/events".to_string()));
155 assert_eq!(config.timeout_ms, 5000);
156 assert_eq!(config.adaptive_max_batch_size, Some(1000));
157 assert_eq!(config.adaptive_min_batch_size, Some(10));
158 assert_eq!(config.adaptive_max_wait_ms, Some(500));
159 assert_eq!(config.adaptive_min_wait_ms, Some(10));
160 assert_eq!(config.adaptive_window_secs, Some(60));
161 assert_eq!(config.adaptive_enabled, Some(true));
162 }
163
164 #[test]
165 fn test_config_serialization() {
166 let config = HttpSourceConfig {
167 host: "localhost".to_string(),
168 port: 8080,
169 endpoint: Some("/data".to_string()),
170 timeout_ms: 15000,
171 adaptive_max_batch_size: Some(500),
172 adaptive_min_batch_size: Some(5),
173 adaptive_max_wait_ms: Some(1000),
174 adaptive_min_wait_ms: Some(50),
175 adaptive_window_secs: Some(30),
176 adaptive_enabled: Some(false),
177 };
178
179 let yaml = serde_yaml::to_string(&config).unwrap();
180 let deserialized: HttpSourceConfig = serde_yaml::from_str(&yaml).unwrap();
181 assert_eq!(config, deserialized);
182 }
183
184 #[test]
185 fn test_config_adaptive_batching_disabled() {
186 let yaml = r#"
187host: "localhost"
188port: 8080
189adaptive_enabled: false
190"#;
191 let config: HttpSourceConfig = serde_yaml::from_str(yaml).unwrap();
192 assert_eq!(config.adaptive_enabled, Some(false));
193 }
194
195 #[test]
196 fn test_config_default_values() {
197 let config = HttpSourceConfig {
198 host: "localhost".to_string(),
199 port: 8080,
200 endpoint: None,
201 timeout_ms: default_timeout_ms(),
202 adaptive_max_batch_size: None,
203 adaptive_min_batch_size: None,
204 adaptive_max_wait_ms: None,
205 adaptive_min_wait_ms: None,
206 adaptive_window_secs: None,
207 adaptive_enabled: None,
208 };
209
210 assert_eq!(config.timeout_ms, 10000);
211 }
212
213 #[test]
214 fn test_config_port_range() {
215 let yaml = r#"
217host: "localhost"
218port: 65535
219"#;
220 let config: HttpSourceConfig = serde_yaml::from_str(yaml).unwrap();
221 assert_eq!(config.port, 65535);
222
223 let yaml = r#"
225host: "localhost"
226port: 1
227"#;
228 let config: HttpSourceConfig = serde_yaml::from_str(yaml).unwrap();
229 assert_eq!(config.port, 1);
230 }
231}