Skip to main content

rivven_core/
validation.rs

1//! Input validation for Rivven operations
2//!
3//! Provides security validation for identifiers, limits, and configuration.
4//! Used to prevent injection attacks, path traversal, and resource exhaustion.
5
6use once_cell::sync::Lazy;
7use regex::Regex;
8
9/// Maximum allowed identifier length (255 for flexibility, matches PostgreSQL)
10pub const MAX_IDENTIFIER_LENGTH: usize = 255;
11
12/// Maximum message size (10 MB default, configurable)
13pub const DEFAULT_MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024;
14
15/// Minimum topic name length
16pub const MIN_TOPIC_NAME_LENGTH: usize = 1;
17
18/// Maximum topic name length
19pub const MAX_TOPIC_NAME_LENGTH: usize = 255;
20
21/// Regex for validating SQL-style identifiers
22static IDENTIFIER_REGEX: Lazy<Regex> =
23    Lazy::new(|| Regex::new(r"^[a-zA-Z_][a-zA-Z0-9_]{0,254}$").unwrap());
24
25/// Regex for validating topic names (allows dots, hyphens, underscores)
26/// Matches Kafka topic naming conventions
27static TOPIC_NAME_REGEX: Lazy<Regex> =
28    Lazy::new(|| Regex::new(r"^[a-zA-Z0-9][a-zA-Z0-9._-]{0,254}$").unwrap());
29
30/// Validation error type
31#[derive(Debug, Clone, PartialEq, Eq)]
32pub enum ValidationError {
33    /// Identifier is empty
34    EmptyIdentifier,
35    /// Identifier is too long
36    IdentifierTooLong { len: usize, max: usize },
37    /// Identifier contains invalid characters
38    InvalidIdentifier { name: String, reason: &'static str },
39    /// Topic name is empty
40    EmptyTopicName,
41    /// Topic name is too long
42    TopicNameTooLong { len: usize, max: usize },
43    /// Topic name contains invalid characters
44    InvalidTopicName { name: String, reason: &'static str },
45    /// Message size exceeds limit
46    MessageTooLarge { size: usize, max: usize },
47    /// Partition number is invalid
48    InvalidPartition { partition: u32, max: u32 },
49    /// Consumer group ID is invalid
50    InvalidConsumerGroupId { id: String },
51    /// Client ID is invalid
52    InvalidClientId { id: String },
53}
54
55impl std::fmt::Display for ValidationError {
56    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57        match self {
58            Self::EmptyIdentifier => write!(f, "Identifier cannot be empty"),
59            Self::IdentifierTooLong { len, max } => {
60                write!(f, "Identifier too long: {} chars (max: {})", len, max)
61            }
62            Self::InvalidIdentifier { name, reason } => {
63                write!(f, "Invalid identifier '{}': {}", name, reason)
64            }
65            Self::EmptyTopicName => write!(f, "Topic name cannot be empty"),
66            Self::TopicNameTooLong { len, max } => {
67                write!(f, "Topic name too long: {} chars (max: {})", len, max)
68            }
69            Self::InvalidTopicName { name, reason } => {
70                write!(f, "Invalid topic name '{}': {}", name, reason)
71            }
72            Self::MessageTooLarge { size, max } => {
73                write!(f, "Message size {} exceeds maximum {}", size, max)
74            }
75            Self::InvalidPartition { partition, max } => {
76                write!(f, "Invalid partition {}: must be < {}", partition, max)
77            }
78            Self::InvalidConsumerGroupId { id } => {
79                write!(f, "Invalid consumer group ID: {}", id)
80            }
81            Self::InvalidClientId { id } => {
82                write!(f, "Invalid client ID: {}", id)
83            }
84        }
85    }
86}
87
88impl std::error::Error for ValidationError {}
89
90/// Validator for Rivven inputs
91pub struct Validator;
92
93impl Validator {
94    /// Validate a SQL-style identifier (slot names, internal identifiers)
95    ///
96    /// # Security
97    ///
98    /// Prevents SQL injection by rejecting:
99    /// - Empty strings
100    /// - Strings starting with numbers
101    /// - Special characters (quotes, semicolons, etc.)
102    /// - Excessively long strings
103    pub fn validate_identifier(name: &str) -> Result<(), ValidationError> {
104        if name.is_empty() {
105            return Err(ValidationError::EmptyIdentifier);
106        }
107
108        if name.len() > MAX_IDENTIFIER_LENGTH {
109            return Err(ValidationError::IdentifierTooLong {
110                len: name.len(),
111                max: MAX_IDENTIFIER_LENGTH,
112            });
113        }
114
115        if !IDENTIFIER_REGEX.is_match(name) {
116            return Err(ValidationError::InvalidIdentifier {
117                name: name.to_string(),
118                reason: "must start with letter/underscore and contain only alphanumeric characters and underscores",
119            });
120        }
121
122        Ok(())
123    }
124
125    /// Validate a topic name
126    ///
127    /// Topic names follow Kafka conventions:
128    /// - Must start with alphanumeric character
129    /// - Can contain alphanumeric, dots, hyphens, and underscores
130    /// - Maximum 255 characters
131    pub fn validate_topic_name(name: &str) -> Result<(), ValidationError> {
132        if name.is_empty() {
133            return Err(ValidationError::EmptyTopicName);
134        }
135
136        if name.len() > MAX_TOPIC_NAME_LENGTH {
137            return Err(ValidationError::TopicNameTooLong {
138                len: name.len(),
139                max: MAX_TOPIC_NAME_LENGTH,
140            });
141        }
142
143        if !TOPIC_NAME_REGEX.is_match(name) {
144            return Err(ValidationError::InvalidTopicName {
145                name: name.to_string(),
146                reason: "must start with alphanumeric and contain only alphanumeric, dots, hyphens, underscores",
147            });
148        }
149
150        // Additional security checks for common attack patterns
151        if name.contains("..") {
152            return Err(ValidationError::InvalidTopicName {
153                name: name.to_string(),
154                reason: "path traversal patterns not allowed",
155            });
156        }
157
158        Ok(())
159    }
160
161    /// Validate message size
162    pub fn validate_message_size(size: usize, max: usize) -> Result<(), ValidationError> {
163        if size > max {
164            return Err(ValidationError::MessageTooLarge { size, max });
165        }
166        Ok(())
167    }
168
169    /// Validate partition number
170    pub fn validate_partition(partition: u32, num_partitions: u32) -> Result<(), ValidationError> {
171        if partition >= num_partitions {
172            return Err(ValidationError::InvalidPartition {
173                partition,
174                max: num_partitions,
175            });
176        }
177        Ok(())
178    }
179
180    /// Validate consumer group ID
181    ///
182    /// Uses the same rules as topic names
183    pub fn validate_consumer_group_id(id: &str) -> Result<(), ValidationError> {
184        if id.is_empty() || id.len() > MAX_IDENTIFIER_LENGTH {
185            return Err(ValidationError::InvalidConsumerGroupId { id: id.to_string() });
186        }
187
188        if !TOPIC_NAME_REGEX.is_match(id) {
189            return Err(ValidationError::InvalidConsumerGroupId { id: id.to_string() });
190        }
191
192        Ok(())
193    }
194
195    /// Validate client ID
196    ///
197    /// Uses the same rules as topic names
198    pub fn validate_client_id(id: &str) -> Result<(), ValidationError> {
199        if id.is_empty() || id.len() > MAX_IDENTIFIER_LENGTH {
200            return Err(ValidationError::InvalidClientId { id: id.to_string() });
201        }
202
203        if !TOPIC_NAME_REGEX.is_match(id) {
204            return Err(ValidationError::InvalidClientId { id: id.to_string() });
205        }
206
207        Ok(())
208    }
209
210    /// Sanitize a string for safe logging (remove control characters, truncate)
211    pub fn sanitize_for_log(s: &str, max_len: usize) -> String {
212        let sanitized: String = s
213            .chars()
214            .filter(|c| !c.is_control())
215            .take(max_len)
216            .collect();
217
218        if s.len() > max_len {
219            format!("{}...", sanitized)
220        } else {
221            sanitized
222        }
223    }
224}
225
226#[cfg(test)]
227mod tests {
228    use super::*;
229
230    #[test]
231    fn test_valid_identifiers() {
232        assert!(Validator::validate_identifier("my_table").is_ok());
233        assert!(Validator::validate_identifier("MyTable123").is_ok());
234        assert!(Validator::validate_identifier("_private").is_ok());
235        assert!(Validator::validate_identifier("table_").is_ok());
236        assert!(Validator::validate_identifier("a").is_ok());
237    }
238
239    #[test]
240    fn test_invalid_identifiers() {
241        // Empty
242        assert!(Validator::validate_identifier("").is_err());
243
244        // Starts with number
245        assert!(Validator::validate_identifier("123table").is_err());
246
247        // Contains special characters
248        assert!(Validator::validate_identifier("table-name").is_err());
249        assert!(Validator::validate_identifier("table.name").is_err());
250        assert!(Validator::validate_identifier("table;DROP").is_err());
251        assert!(Validator::validate_identifier("table'; DROP TABLE users; --").is_err());
252
253        // Too long
254        let long_name = "a".repeat(MAX_IDENTIFIER_LENGTH + 1);
255        assert!(Validator::validate_identifier(&long_name).is_err());
256    }
257
258    #[test]
259    fn test_sql_injection_attempts() {
260        let attacks = [
261            "table'; DROP TABLE users; --",
262            "table\"; DROP TABLE users; --",
263            "table`; DROP TABLE users; --",
264            "table/**/OR/**/1=1",
265            "table%27",
266            "table\0",
267            "table\n",
268            "../../../etc/passwd",
269        ];
270
271        for attack in attacks {
272            assert!(
273                Validator::validate_identifier(attack).is_err(),
274                "Should reject SQL injection attempt: {}",
275                attack
276            );
277        }
278    }
279
280    #[test]
281    fn test_valid_topic_names() {
282        assert!(Validator::validate_topic_name("my-topic").is_ok());
283        assert!(Validator::validate_topic_name("my_topic").is_ok());
284        assert!(Validator::validate_topic_name("my.topic.name").is_ok());
285        assert!(Validator::validate_topic_name("MyTopic123").is_ok());
286        assert!(Validator::validate_topic_name("topic-with-many-dashes").is_ok());
287    }
288
289    #[test]
290    fn test_invalid_topic_names() {
291        // Empty
292        assert!(Validator::validate_topic_name("").is_err());
293
294        // Starts with invalid character
295        assert!(Validator::validate_topic_name("-topic").is_err());
296        assert!(Validator::validate_topic_name("_topic").is_err());
297        assert!(Validator::validate_topic_name(".topic").is_err());
298
299        // Path traversal
300        assert!(Validator::validate_topic_name("../etc/passwd").is_err());
301        assert!(Validator::validate_topic_name("topic..name").is_err());
302
303        // Special characters
304        assert!(Validator::validate_topic_name("topic;DROP").is_err());
305    }
306
307    #[test]
308    fn test_message_size_validation() {
309        let max = 1024;
310        assert!(Validator::validate_message_size(100, max).is_ok());
311        assert!(Validator::validate_message_size(1024, max).is_ok());
312        assert!(Validator::validate_message_size(1025, max).is_err());
313    }
314
315    #[test]
316    fn test_partition_validation() {
317        assert!(Validator::validate_partition(0, 3).is_ok());
318        assert!(Validator::validate_partition(2, 3).is_ok());
319        assert!(Validator::validate_partition(3, 3).is_err());
320        assert!(Validator::validate_partition(10, 3).is_err());
321    }
322
323    #[test]
324    fn test_sanitize_for_log() {
325        // Normal string
326        assert_eq!(Validator::sanitize_for_log("hello", 100), "hello");
327
328        // Truncation
329        assert_eq!(Validator::sanitize_for_log("hello world", 5), "hello...");
330
331        // Control characters removed
332        let with_control = "hello\x00\x01\x02world";
333        assert_eq!(Validator::sanitize_for_log(with_control, 100), "helloworld");
334
335        // Newlines removed
336        let with_newlines = "hello\nworld";
337        assert_eq!(
338            Validator::sanitize_for_log(with_newlines, 100),
339            "helloworld"
340        );
341    }
342}