Skip to main content

drasi_source_http/
config.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Configuration for the HTTP source.
16//!
17//! The HTTP source receives data changes via HTTP endpoints.
18
19use serde::{Deserialize, Serialize};
20
21/// HTTP source configuration
22///
23/// This config only contains HTTP-specific settings.
24/// Bootstrap provider configuration (database, user, password, tables, etc.)
25/// should be provided via the source's generic properties map.
26#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
27pub struct HttpSourceConfig {
28    /// HTTP host
29    pub host: String,
30
31    /// HTTP port
32    pub port: u16,
33
34    /// Optional endpoint path
35    #[serde(default, skip_serializing_if = "Option::is_none")]
36    pub endpoint: Option<String>,
37
38    /// Request timeout in milliseconds
39    #[serde(default = "default_timeout_ms")]
40    pub timeout_ms: u64,
41
42    /// Adaptive batching: maximum batch size
43    #[serde(default, skip_serializing_if = "Option::is_none")]
44    pub adaptive_max_batch_size: Option<usize>,
45
46    /// Adaptive batching: minimum batch size
47    #[serde(default, skip_serializing_if = "Option::is_none")]
48    pub adaptive_min_batch_size: Option<usize>,
49
50    /// Adaptive batching: maximum wait time in milliseconds
51    #[serde(default, skip_serializing_if = "Option::is_none")]
52    pub adaptive_max_wait_ms: Option<u64>,
53
54    /// Adaptive batching: minimum wait time in milliseconds
55    #[serde(default, skip_serializing_if = "Option::is_none")]
56    pub adaptive_min_wait_ms: Option<u64>,
57
58    /// Adaptive batching: throughput window in seconds
59    #[serde(default, skip_serializing_if = "Option::is_none")]
60    pub adaptive_window_secs: Option<u64>,
61
62    /// Whether adaptive batching is enabled
63    #[serde(default, skip_serializing_if = "Option::is_none")]
64    pub adaptive_enabled: Option<bool>,
65}
66
67fn default_timeout_ms() -> u64 {
68    10000
69}
70
71impl HttpSourceConfig {
72    /// Validate the configuration and return an error if invalid.
73    ///
74    /// # Errors
75    ///
76    /// Returns an error if:
77    /// - Port is 0 (invalid port)
78    /// - Timeout is 0 (would cause immediate timeouts)
79    /// - Adaptive batching min values exceed max values
80    pub fn validate(&self) -> anyhow::Result<()> {
81        if self.port == 0 {
82            return Err(anyhow::anyhow!(
83                "Validation error: port cannot be 0. \
84                 Please specify a valid port number (1-65535)"
85            ));
86        }
87
88        if self.timeout_ms == 0 {
89            return Err(anyhow::anyhow!(
90                "Validation error: timeout_ms cannot be 0. \
91                 Please specify a positive timeout value in milliseconds"
92            ));
93        }
94
95        // Validate adaptive batching settings
96        if let (Some(min), Some(max)) = (self.adaptive_min_batch_size, self.adaptive_max_batch_size)
97        {
98            if min > max {
99                return Err(anyhow::anyhow!(
100                    "Validation error: adaptive_min_batch_size ({min}) cannot be greater than \
101                     adaptive_max_batch_size ({max})"
102                ));
103            }
104        }
105
106        if let (Some(min), Some(max)) = (self.adaptive_min_wait_ms, self.adaptive_max_wait_ms) {
107            if min > max {
108                return Err(anyhow::anyhow!(
109                    "Validation error: adaptive_min_wait_ms ({min}) cannot be greater than \
110                     adaptive_max_wait_ms ({max})"
111                ));
112            }
113        }
114
115        Ok(())
116    }
117}
118
119#[cfg(test)]
120mod tests {
121    use super::*;
122
123    #[test]
124    fn test_config_deserialization_minimal() {
125        let yaml = r#"
126host: "localhost"
127port: 8080
128"#;
129        let config: HttpSourceConfig = serde_yaml::from_str(yaml).unwrap();
130        assert_eq!(config.host, "localhost");
131        assert_eq!(config.port, 8080);
132        assert_eq!(config.endpoint, None);
133        assert_eq!(config.timeout_ms, 10000); // default
134        assert_eq!(config.adaptive_enabled, None);
135    }
136
137    #[test]
138    fn test_config_deserialization_full() {
139        let yaml = r#"
140host: "0.0.0.0"
141port: 9000
142endpoint: "/events"
143timeout_ms: 5000
144adaptive_max_batch_size: 1000
145adaptive_min_batch_size: 10
146adaptive_max_wait_ms: 500
147adaptive_min_wait_ms: 10
148adaptive_window_secs: 60
149adaptive_enabled: true
150"#;
151        let config: HttpSourceConfig = serde_yaml::from_str(yaml).unwrap();
152        assert_eq!(config.host, "0.0.0.0");
153        assert_eq!(config.port, 9000);
154        assert_eq!(config.endpoint, Some("/events".to_string()));
155        assert_eq!(config.timeout_ms, 5000);
156        assert_eq!(config.adaptive_max_batch_size, Some(1000));
157        assert_eq!(config.adaptive_min_batch_size, Some(10));
158        assert_eq!(config.adaptive_max_wait_ms, Some(500));
159        assert_eq!(config.adaptive_min_wait_ms, Some(10));
160        assert_eq!(config.adaptive_window_secs, Some(60));
161        assert_eq!(config.adaptive_enabled, Some(true));
162    }
163
164    #[test]
165    fn test_config_serialization() {
166        let config = HttpSourceConfig {
167            host: "localhost".to_string(),
168            port: 8080,
169            endpoint: Some("/data".to_string()),
170            timeout_ms: 15000,
171            adaptive_max_batch_size: Some(500),
172            adaptive_min_batch_size: Some(5),
173            adaptive_max_wait_ms: Some(1000),
174            adaptive_min_wait_ms: Some(50),
175            adaptive_window_secs: Some(30),
176            adaptive_enabled: Some(false),
177        };
178
179        let yaml = serde_yaml::to_string(&config).unwrap();
180        let deserialized: HttpSourceConfig = serde_yaml::from_str(&yaml).unwrap();
181        assert_eq!(config, deserialized);
182    }
183
184    #[test]
185    fn test_config_adaptive_batching_disabled() {
186        let yaml = r#"
187host: "localhost"
188port: 8080
189adaptive_enabled: false
190"#;
191        let config: HttpSourceConfig = serde_yaml::from_str(yaml).unwrap();
192        assert_eq!(config.adaptive_enabled, Some(false));
193    }
194
195    #[test]
196    fn test_config_default_values() {
197        let config = HttpSourceConfig {
198            host: "localhost".to_string(),
199            port: 8080,
200            endpoint: None,
201            timeout_ms: default_timeout_ms(),
202            adaptive_max_batch_size: None,
203            adaptive_min_batch_size: None,
204            adaptive_max_wait_ms: None,
205            adaptive_min_wait_ms: None,
206            adaptive_window_secs: None,
207            adaptive_enabled: None,
208        };
209
210        assert_eq!(config.timeout_ms, 10000);
211    }
212
213    #[test]
214    fn test_config_port_range() {
215        // Test valid port
216        let yaml = r#"
217host: "localhost"
218port: 65535
219"#;
220        let config: HttpSourceConfig = serde_yaml::from_str(yaml).unwrap();
221        assert_eq!(config.port, 65535);
222
223        // Test minimum port
224        let yaml = r#"
225host: "localhost"
226port: 1
227"#;
228        let config: HttpSourceConfig = serde_yaml::from_str(yaml).unwrap();
229        assert_eq!(config.port, 1);
230    }
231}