1use once_cell::sync::Lazy;
7use regex::Regex;
8
9pub const MAX_IDENTIFIER_LENGTH: usize = 255;
11
12pub const DEFAULT_MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024;
14
15pub const MIN_TOPIC_NAME_LENGTH: usize = 1;
17
18pub const MAX_TOPIC_NAME_LENGTH: usize = 255;
20
21static IDENTIFIER_REGEX: Lazy<Regex> =
23 Lazy::new(|| Regex::new(r"^[a-zA-Z_][a-zA-Z0-9_]{0,254}$").unwrap());
24
25static TOPIC_NAME_REGEX: Lazy<Regex> =
28 Lazy::new(|| Regex::new(r"^[a-zA-Z0-9][a-zA-Z0-9._-]{0,254}$").unwrap());
29
30#[derive(Debug, Clone, PartialEq, Eq)]
32pub enum ValidationError {
33 EmptyIdentifier,
35 IdentifierTooLong { len: usize, max: usize },
37 InvalidIdentifier { name: String, reason: &'static str },
39 EmptyTopicName,
41 TopicNameTooLong { len: usize, max: usize },
43 InvalidTopicName { name: String, reason: &'static str },
45 MessageTooLarge { size: usize, max: usize },
47 InvalidPartition { partition: u32, max: u32 },
49 InvalidConsumerGroupId { id: String },
51 InvalidClientId { id: String },
53}
54
55impl std::fmt::Display for ValidationError {
56 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57 match self {
58 Self::EmptyIdentifier => write!(f, "Identifier cannot be empty"),
59 Self::IdentifierTooLong { len, max } => {
60 write!(f, "Identifier too long: {} chars (max: {})", len, max)
61 }
62 Self::InvalidIdentifier { name, reason } => {
63 write!(f, "Invalid identifier '{}': {}", name, reason)
64 }
65 Self::EmptyTopicName => write!(f, "Topic name cannot be empty"),
66 Self::TopicNameTooLong { len, max } => {
67 write!(f, "Topic name too long: {} chars (max: {})", len, max)
68 }
69 Self::InvalidTopicName { name, reason } => {
70 write!(f, "Invalid topic name '{}': {}", name, reason)
71 }
72 Self::MessageTooLarge { size, max } => {
73 write!(f, "Message size {} exceeds maximum {}", size, max)
74 }
75 Self::InvalidPartition { partition, max } => {
76 write!(f, "Invalid partition {}: must be < {}", partition, max)
77 }
78 Self::InvalidConsumerGroupId { id } => {
79 write!(f, "Invalid consumer group ID: {}", id)
80 }
81 Self::InvalidClientId { id } => {
82 write!(f, "Invalid client ID: {}", id)
83 }
84 }
85 }
86}
87
88impl std::error::Error for ValidationError {}
89
90pub struct Validator;
92
93impl Validator {
94 pub fn validate_identifier(name: &str) -> Result<(), ValidationError> {
104 if name.is_empty() {
105 return Err(ValidationError::EmptyIdentifier);
106 }
107
108 if name.len() > MAX_IDENTIFIER_LENGTH {
109 return Err(ValidationError::IdentifierTooLong {
110 len: name.len(),
111 max: MAX_IDENTIFIER_LENGTH,
112 });
113 }
114
115 if !IDENTIFIER_REGEX.is_match(name) {
116 return Err(ValidationError::InvalidIdentifier {
117 name: name.to_string(),
118 reason: "must start with letter/underscore and contain only alphanumeric characters and underscores",
119 });
120 }
121
122 Ok(())
123 }
124
125 pub fn validate_topic_name(name: &str) -> Result<(), ValidationError> {
132 if name.is_empty() {
133 return Err(ValidationError::EmptyTopicName);
134 }
135
136 if name.len() > MAX_TOPIC_NAME_LENGTH {
137 return Err(ValidationError::TopicNameTooLong {
138 len: name.len(),
139 max: MAX_TOPIC_NAME_LENGTH,
140 });
141 }
142
143 if !TOPIC_NAME_REGEX.is_match(name) {
144 return Err(ValidationError::InvalidTopicName {
145 name: name.to_string(),
146 reason: "must start with alphanumeric and contain only alphanumeric, dots, hyphens, underscores",
147 });
148 }
149
150 if name.contains("..") {
152 return Err(ValidationError::InvalidTopicName {
153 name: name.to_string(),
154 reason: "path traversal patterns not allowed",
155 });
156 }
157
158 Ok(())
159 }
160
161 pub fn validate_message_size(size: usize, max: usize) -> Result<(), ValidationError> {
163 if size > max {
164 return Err(ValidationError::MessageTooLarge { size, max });
165 }
166 Ok(())
167 }
168
169 pub fn validate_partition(partition: u32, num_partitions: u32) -> Result<(), ValidationError> {
171 if partition >= num_partitions {
172 return Err(ValidationError::InvalidPartition {
173 partition,
174 max: num_partitions,
175 });
176 }
177 Ok(())
178 }
179
180 pub fn validate_consumer_group_id(id: &str) -> Result<(), ValidationError> {
184 if id.is_empty() || id.len() > MAX_IDENTIFIER_LENGTH {
185 return Err(ValidationError::InvalidConsumerGroupId { id: id.to_string() });
186 }
187
188 if !TOPIC_NAME_REGEX.is_match(id) {
189 return Err(ValidationError::InvalidConsumerGroupId { id: id.to_string() });
190 }
191
192 Ok(())
193 }
194
195 pub fn validate_client_id(id: &str) -> Result<(), ValidationError> {
199 if id.is_empty() || id.len() > MAX_IDENTIFIER_LENGTH {
200 return Err(ValidationError::InvalidClientId { id: id.to_string() });
201 }
202
203 if !TOPIC_NAME_REGEX.is_match(id) {
204 return Err(ValidationError::InvalidClientId { id: id.to_string() });
205 }
206
207 Ok(())
208 }
209
210 pub fn sanitize_for_log(s: &str, max_len: usize) -> String {
212 let sanitized: String = s
213 .chars()
214 .filter(|c| !c.is_control())
215 .take(max_len)
216 .collect();
217
218 if s.len() > max_len {
219 format!("{}...", sanitized)
220 } else {
221 sanitized
222 }
223 }
224}
225
226#[cfg(test)]
227mod tests {
228 use super::*;
229
230 #[test]
231 fn test_valid_identifiers() {
232 assert!(Validator::validate_identifier("my_table").is_ok());
233 assert!(Validator::validate_identifier("MyTable123").is_ok());
234 assert!(Validator::validate_identifier("_private").is_ok());
235 assert!(Validator::validate_identifier("table_").is_ok());
236 assert!(Validator::validate_identifier("a").is_ok());
237 }
238
239 #[test]
240 fn test_invalid_identifiers() {
241 assert!(Validator::validate_identifier("").is_err());
243
244 assert!(Validator::validate_identifier("123table").is_err());
246
247 assert!(Validator::validate_identifier("table-name").is_err());
249 assert!(Validator::validate_identifier("table.name").is_err());
250 assert!(Validator::validate_identifier("table;DROP").is_err());
251 assert!(Validator::validate_identifier("table'; DROP TABLE users; --").is_err());
252
253 let long_name = "a".repeat(MAX_IDENTIFIER_LENGTH + 1);
255 assert!(Validator::validate_identifier(&long_name).is_err());
256 }
257
258 #[test]
259 fn test_sql_injection_attempts() {
260 let attacks = [
261 "table'; DROP TABLE users; --",
262 "table\"; DROP TABLE users; --",
263 "table`; DROP TABLE users; --",
264 "table/**/OR/**/1=1",
265 "table%27",
266 "table\0",
267 "table\n",
268 "../../../etc/passwd",
269 ];
270
271 for attack in attacks {
272 assert!(
273 Validator::validate_identifier(attack).is_err(),
274 "Should reject SQL injection attempt: {}",
275 attack
276 );
277 }
278 }
279
280 #[test]
281 fn test_valid_topic_names() {
282 assert!(Validator::validate_topic_name("my-topic").is_ok());
283 assert!(Validator::validate_topic_name("my_topic").is_ok());
284 assert!(Validator::validate_topic_name("my.topic.name").is_ok());
285 assert!(Validator::validate_topic_name("MyTopic123").is_ok());
286 assert!(Validator::validate_topic_name("topic-with-many-dashes").is_ok());
287 }
288
289 #[test]
290 fn test_invalid_topic_names() {
291 assert!(Validator::validate_topic_name("").is_err());
293
294 assert!(Validator::validate_topic_name("-topic").is_err());
296 assert!(Validator::validate_topic_name("_topic").is_err());
297 assert!(Validator::validate_topic_name(".topic").is_err());
298
299 assert!(Validator::validate_topic_name("../etc/passwd").is_err());
301 assert!(Validator::validate_topic_name("topic..name").is_err());
302
303 assert!(Validator::validate_topic_name("topic;DROP").is_err());
305 }
306
307 #[test]
308 fn test_message_size_validation() {
309 let max = 1024;
310 assert!(Validator::validate_message_size(100, max).is_ok());
311 assert!(Validator::validate_message_size(1024, max).is_ok());
312 assert!(Validator::validate_message_size(1025, max).is_err());
313 }
314
315 #[test]
316 fn test_partition_validation() {
317 assert!(Validator::validate_partition(0, 3).is_ok());
318 assert!(Validator::validate_partition(2, 3).is_ok());
319 assert!(Validator::validate_partition(3, 3).is_err());
320 assert!(Validator::validate_partition(10, 3).is_err());
321 }
322
323 #[test]
324 fn test_sanitize_for_log() {
325 assert_eq!(Validator::sanitize_for_log("hello", 100), "hello");
327
328 assert_eq!(Validator::sanitize_for_log("hello world", 5), "hello...");
330
331 let with_control = "hello\x00\x01\x02world";
333 assert_eq!(Validator::sanitize_for_log(with_control, 100), "helloworld");
334
335 let with_newlines = "hello\nworld";
337 assert_eq!(
338 Validator::sanitize_for_log(with_newlines, 100),
339 "helloworld"
340 );
341 }
342}