Skip to main content

synapse_pingora/
body.rs

1//! Request/Response Body Inspection Module
2//!
3//! Provides functionality for inspecting HTTP request and response bodies,
4//! including content-type detection, parsing, and anomaly detection.
5
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::time::{Duration, Instant};
9use thiserror::Error;
10use tracing::{debug, instrument};
11
12/// Errors that can occur during body inspection
13#[derive(Debug, Error)]
14pub enum BodyError {
15    #[error("payload too large: {size} bytes exceeds limit of {limit} bytes")]
16    PayloadTooLarge { size: usize, limit: usize },
17
18    #[error("parse error: {message}")]
19    ParseError {
20        message: String,
21        content_type: ContentType,
22    },
23
24    #[error("inspection timeout after {elapsed:?}")]
25    Timeout { elapsed: Duration, limit: Duration },
26
27    #[error("max parse depth exceeded: {depth} > {limit}")]
28    MaxDepthExceeded { depth: usize, limit: usize },
29}
30
31pub type BodyResult<T> = Result<T, BodyError>;
32
33/// Detected content type of HTTP body
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
35#[serde(rename_all = "snake_case")]
36pub enum ContentType {
37    Json,
38    Xml,
39    FormUrlencoded,
40    Multipart,
41    PlainText,
42    Html,
43    Binary,
44    #[default]
45    Unknown,
46}
47
48impl ContentType {
49    pub fn from_header(header: &str) -> Self {
50        let lower = header.to_lowercase();
51        let mime = lower.split(';').next().unwrap_or("").trim();
52        match mime {
53            "application/json" | "text/json" => Self::Json,
54            "application/xml" | "text/xml" => Self::Xml,
55            "application/x-www-form-urlencoded" => Self::FormUrlencoded,
56            m if m.starts_with("multipart/") => Self::Multipart,
57            "text/plain" => Self::PlainText,
58            "text/html" => Self::Html,
59            "application/octet-stream" => Self::Binary,
60            _ => Self::Unknown,
61        }
62    }
63
64    pub fn detect_from_body(body: &[u8]) -> Self {
65        if body.is_empty() {
66            return Self::Unknown;
67        }
68        let trimmed: Vec<u8> = body
69            .iter()
70            .skip_while(|&&b| b.is_ascii_whitespace())
71            .copied()
72            .collect();
73        if trimmed.is_empty() {
74            return Self::Unknown;
75        }
76        let first = trimmed[0];
77        if first == b'{' || first == b'[' {
78            return Self::Json;
79        }
80        if first == b'<' {
81            if let Ok(s) = std::str::from_utf8(&trimmed) {
82                let lower = s.to_lowercase();
83                if lower.starts_with("<!doctype html") || lower.starts_with("<html") {
84                    return Self::Html;
85                }
86                return Self::Xml;
87            }
88        }
89        if let Ok(s) = std::str::from_utf8(body) {
90            if s.contains('=') && (s.contains('&') || !s.contains(' ')) {
91                return Self::FormUrlencoded;
92            }
93            return Self::PlainText;
94        }
95        Self::Binary
96    }
97
98    pub const fn is_text(&self) -> bool {
99        matches!(
100            self,
101            Self::Json | Self::Xml | Self::FormUrlencoded | Self::PlainText | Self::Html
102        )
103    }
104}
105
106impl std::fmt::Display for ContentType {
107    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108        match self {
109            Self::Json => write!(f, "application/json"),
110            Self::Xml => write!(f, "application/xml"),
111            Self::FormUrlencoded => write!(f, "application/x-www-form-urlencoded"),
112            Self::Multipart => write!(f, "multipart/form-data"),
113            Self::PlainText => write!(f, "text/plain"),
114            Self::Html => write!(f, "text/html"),
115            Self::Binary => write!(f, "application/octet-stream"),
116            Self::Unknown => write!(f, "unknown"),
117        }
118    }
119}
120
121/// Parsed body structure
122#[derive(Debug, Clone, Serialize, Deserialize)]
123#[serde(tag = "type", content = "data")]
124pub enum ParsedBody {
125    Json(serde_json::Value),
126    Form(HashMap<String, Vec<String>>),
127    Text(String),
128    Binary { size: usize, hash: String },
129}
130
131/// Detected anomaly in body content
132#[derive(Debug, Clone, Serialize, Deserialize)]
133pub struct BodyAnomaly {
134    pub anomaly_type: AnomalyType,
135    pub severity: f32,
136    pub description: String,
137}
138
139#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
140#[serde(rename_all = "snake_case")]
141pub enum AnomalyType {
142    OversizedPayload,
143    MalformedContent,
144    ContentTypeMismatch,
145    NullBytesInText,
146    ControlCharacters,
147    DuplicateKeys,
148}
149
150impl BodyAnomaly {
151    pub fn new(anomaly_type: AnomalyType, severity: f32, description: impl Into<String>) -> Self {
152        Self {
153            anomaly_type,
154            severity: severity.clamp(0.0, 1.0),
155            description: description.into(),
156        }
157    }
158}
159
160/// Configuration for body inspection
161#[derive(Debug, Clone, Serialize, Deserialize)]
162pub struct BodyConfig {
163    pub max_body_size: usize,
164    pub max_parse_depth: usize,
165    pub timeout: Duration,
166    pub detect_anomalies: bool,
167    pub large_payload_threshold: usize,
168}
169
170impl Default for BodyConfig {
171    fn default() -> Self {
172        Self {
173            max_body_size: 10 * 1024 * 1024,
174            max_parse_depth: 32,
175            timeout: Duration::from_secs(5),
176            detect_anomalies: true,
177            large_payload_threshold: 1024 * 1024,
178        }
179    }
180}
181
182/// Result of body inspection
183#[derive(Debug, Clone, Serialize, Deserialize)]
184pub struct InspectionResult {
185    pub content_type: ContentType,
186    pub declared_content_type: Option<ContentType>,
187    pub body_size: usize,
188    pub parsed_structure: Option<ParsedBody>,
189    pub anomalies: Vec<BodyAnomaly>,
190    pub processing_time: Duration,
191    pub parse_success: bool,
192    pub parse_error: Option<String>,
193}
194
195impl InspectionResult {
196    pub fn has_anomalies(&self) -> bool {
197        !self.anomalies.is_empty()
198    }
199
200    pub fn max_severity(&self) -> f32 {
201        self.anomalies
202            .iter()
203            .map(|a| a.severity)
204            .max_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal))
205            .unwrap_or(0.0)
206    }
207}
208
209/// Main body inspection engine
210#[derive(Debug)]
211pub struct BodyInspector {
212    config: BodyConfig,
213}
214
215impl BodyInspector {
216    pub fn new(config: BodyConfig) -> Self {
217        Self { config }
218    }
219
220    #[instrument(skip(self, body), fields(body_len = body.len()))]
221    pub fn inspect(
222        &self,
223        body: &[u8],
224        content_type_header: Option<&str>,
225    ) -> BodyResult<InspectionResult> {
226        let start = Instant::now();
227        if body.len() > self.config.max_body_size {
228            return Err(BodyError::PayloadTooLarge {
229                size: body.len(),
230                limit: self.config.max_body_size,
231            });
232        }
233
234        let declared = content_type_header.map(ContentType::from_header);
235        let detected = ContentType::detect_from_body(body);
236        let content_type = declared.unwrap_or(detected);
237
238        let (parsed, parse_success, parse_error) = self.parse_body(body, content_type);
239        let mut anomalies = Vec::new();
240        if self.config.detect_anomalies {
241            self.detect_anomalies(body, content_type, declared, detected, &mut anomalies);
242        }
243
244        debug!(
245            ?content_type,
246            body_size = body.len(),
247            "body inspection complete"
248        );
249        Ok(InspectionResult {
250            content_type,
251            declared_content_type: declared,
252            body_size: body.len(),
253            parsed_structure: parsed,
254            anomalies,
255            processing_time: start.elapsed(),
256            parse_success,
257            parse_error,
258        })
259    }
260
261    fn parse_body(
262        &self,
263        body: &[u8],
264        content_type: ContentType,
265    ) -> (Option<ParsedBody>, bool, Option<String>) {
266        if body.is_empty() {
267            return (None, true, None);
268        }
269        match content_type {
270            ContentType::Json => self.parse_json(body),
271            ContentType::FormUrlencoded => self.parse_form(body),
272            ContentType::PlainText | ContentType::Html => self.parse_text(body),
273            _ => (Some(self.parse_binary(body)), true, None),
274        }
275    }
276
277    fn parse_json(&self, body: &[u8]) -> (Option<ParsedBody>, bool, Option<String>) {
278        let text = match std::str::from_utf8(body) {
279            Ok(s) => s,
280            Err(e) => return (None, false, Some(e.to_string())),
281        };
282
283        // Parse with depth limit to prevent stack overflow from deeply nested payloads
284        match self.parse_json_with_depth_limit(text, self.config.max_parse_depth) {
285            Ok(value) => (Some(ParsedBody::Json(value)), true, None),
286            Err(e) => (None, false, Some(e)),
287        }
288    }
289
290    /// Parse JSON with a maximum nesting depth limit.
291    ///
292    /// This prevents stack overflow attacks from payloads with extreme nesting depth.
293    fn parse_json_with_depth_limit(
294        &self,
295        text: &str,
296        max_depth: usize,
297    ) -> Result<serde_json::Value, String> {
298        use serde_json::Value;
299
300        let value: Value = serde_json::from_str(text).map_err(|e| e.to_string())?;
301
302        // Check depth after parsing (serde_json has a default recursion limit of 128,
303        // but we enforce a stricter limit for security)
304        if self.check_json_depth(&value, 0, max_depth) {
305            Ok(value)
306        } else {
307            Err(format!("JSON nesting depth exceeds limit of {}", max_depth))
308        }
309    }
310
311    /// Recursively check if JSON depth exceeds the limit.
312    fn check_json_depth(
313        &self,
314        value: &serde_json::Value,
315        current_depth: usize,
316        max_depth: usize,
317    ) -> bool {
318        if current_depth > max_depth {
319            return false;
320        }
321
322        match value {
323            serde_json::Value::Array(arr) => arr
324                .iter()
325                .all(|v| self.check_json_depth(v, current_depth + 1, max_depth)),
326            serde_json::Value::Object(obj) => obj
327                .values()
328                .all(|v| self.check_json_depth(v, current_depth + 1, max_depth)),
329            _ => true,
330        }
331    }
332
333    fn parse_form(&self, body: &[u8]) -> (Option<ParsedBody>, bool, Option<String>) {
334        let text = match std::str::from_utf8(body) {
335            Ok(s) => s,
336            Err(e) => return (None, false, Some(e.to_string())),
337        };
338        let mut form: HashMap<String, Vec<String>> = HashMap::new();
339        for pair in text.split('&') {
340            if pair.is_empty() {
341                continue;
342            }
343            let (key, value) = match pair.split_once('=') {
344                Some((k, v)) => (k, v),
345                None => (pair, ""),
346            };
347            form.entry(key.to_string())
348                .or_default()
349                .push(value.to_string());
350        }
351        (Some(ParsedBody::Form(form)), true, None)
352    }
353
354    fn parse_text(&self, body: &[u8]) -> (Option<ParsedBody>, bool, Option<String>) {
355        match std::str::from_utf8(body) {
356            Ok(s) => (Some(ParsedBody::Text(s.to_string())), true, None),
357            Err(e) => (None, false, Some(e.to_string())),
358        }
359    }
360
361    fn parse_binary(&self, body: &[u8]) -> ParsedBody {
362        use std::collections::hash_map::DefaultHasher;
363        use std::hash::{Hash, Hasher};
364        let mut hasher = DefaultHasher::new();
365        body.hash(&mut hasher);
366        ParsedBody::Binary {
367            size: body.len(),
368            hash: format!("{:016x}", hasher.finish()),
369        }
370    }
371
372    fn detect_anomalies(
373        &self,
374        body: &[u8],
375        content_type: ContentType,
376        declared: Option<ContentType>,
377        detected: ContentType,
378        anomalies: &mut Vec<BodyAnomaly>,
379    ) {
380        if body.len() > self.config.large_payload_threshold {
381            anomalies.push(BodyAnomaly::new(
382                AnomalyType::OversizedPayload,
383                0.3,
384                "large payload",
385            ));
386        }
387        if let Some(decl) = declared {
388            if decl != detected && detected != ContentType::Unknown {
389                anomalies.push(BodyAnomaly::new(
390                    AnomalyType::ContentTypeMismatch,
391                    0.6,
392                    "content type mismatch",
393                ));
394            }
395        }
396        if content_type.is_text() && body.contains(&0u8) {
397            anomalies.push(BodyAnomaly::new(
398                AnomalyType::NullBytesInText,
399                0.8,
400                "null bytes in text",
401            ));
402        }
403    }
404}
405
406impl Default for BodyInspector {
407    fn default() -> Self {
408        Self::new(BodyConfig::default())
409    }
410}
411
412#[cfg(test)]
413mod tests {
414    use super::*;
415
416    #[test]
417    fn test_content_type_detection() {
418        assert_eq!(
419            ContentType::from_header("application/json"),
420            ContentType::Json
421        );
422        assert_eq!(ContentType::from_header("text/html"), ContentType::Html);
423        assert_eq!(
424            ContentType::detect_from_body(br#"{"key": "value"}"#),
425            ContentType::Json
426        );
427        assert_eq!(ContentType::detect_from_body(b"<html>"), ContentType::Html);
428    }
429
430    #[test]
431    fn test_inspector_json() {
432        let inspector = BodyInspector::default();
433        let body = br#"{"test": "value"}"#;
434        let result = inspector.inspect(body, Some("application/json")).unwrap();
435        assert_eq!(result.content_type, ContentType::Json);
436        assert!(result.parse_success);
437    }
438
439    #[test]
440    fn test_inspector_size_limit() {
441        let mut config = BodyConfig::default();
442        config.max_body_size = 10;
443        let inspector = BodyInspector::new(config);
444        let body = b"this is way too large";
445        let result = inspector.inspect(body, None);
446        assert!(matches!(result, Err(BodyError::PayloadTooLarge { .. })));
447    }
448
449    #[test]
450    fn test_json_depth_limit_within_limit() {
451        let mut config = BodyConfig::default();
452        config.max_parse_depth = 4;
453        let inspector = BodyInspector::new(config);
454
455        // Depth 3: {"a": {"b": {"c": "value"}}}
456        let body = br#"{"a": {"b": {"c": "value"}}}"#;
457        let result = inspector.inspect(body, Some("application/json")).unwrap();
458        assert!(result.parse_success);
459    }
460
461    #[test]
462    fn test_json_depth_limit_exceeded() {
463        let mut config = BodyConfig::default();
464        config.max_parse_depth = 2;
465        let inspector = BodyInspector::new(config);
466
467        // Depth 3: {"a": {"b": {"c": "value"}}} - exceeds limit of 2
468        let body = br#"{"a": {"b": {"c": "value"}}}"#;
469        let result = inspector.inspect(body, Some("application/json")).unwrap();
470        assert!(!result.parse_success);
471        assert!(result.parse_error.unwrap().contains("depth"));
472    }
473
474    #[test]
475    fn test_json_array_depth_limit() {
476        let mut config = BodyConfig::default();
477        config.max_parse_depth = 3;
478        let inspector = BodyInspector::new(config);
479
480        // Depth 4: [[[[1]]]] - exceeds limit of 3
481        let body = br#"[[[[1]]]]"#;
482        let result = inspector.inspect(body, Some("application/json")).unwrap();
483        assert!(!result.parse_success);
484    }
485
486    #[test]
487    fn test_form_urlencoded_duplicate_keys() {
488        let inspector = BodyInspector::default();
489        let body = b"name=alice&name=bob";
490        let result = inspector
491            .inspect(body, Some("application/x-www-form-urlencoded"))
492            .unwrap();
493
494        assert_eq!(result.content_type, ContentType::FormUrlencoded);
495        assert!(result.parse_success);
496
497        match result.parsed_structure.unwrap() {
498            ParsedBody::Form(form) => {
499                let names = form.get("name").expect("key 'name' should exist");
500                assert_eq!(names, &vec!["alice".to_string(), "bob".to_string()]);
501            }
502            other => panic!("expected ParsedBody::Form, got {:?}", other),
503        }
504    }
505
506    #[test]
507    fn test_form_urlencoded_single_key() {
508        let inspector = BodyInspector::default();
509        let body = b"key=value";
510        let result = inspector
511            .inspect(body, Some("application/x-www-form-urlencoded"))
512            .unwrap();
513
514        assert!(result.parse_success);
515        match result.parsed_structure.unwrap() {
516            ParsedBody::Form(form) => {
517                assert_eq!(form.get("key").unwrap(), &vec!["value".to_string()]);
518            }
519            other => panic!("expected ParsedBody::Form, got {:?}", other),
520        }
521    }
522
523    #[test]
524    fn test_form_urlencoded_key_without_value() {
525        let inspector = BodyInspector::default();
526        let body = b"flag&key=val";
527        let result = inspector
528            .inspect(body, Some("application/x-www-form-urlencoded"))
529            .unwrap();
530
531        assert!(result.parse_success);
532        match result.parsed_structure.unwrap() {
533            ParsedBody::Form(form) => {
534                // "flag" has no '=' so value is ""
535                assert_eq!(form.get("flag").unwrap(), &vec!["".to_string()]);
536                assert_eq!(form.get("key").unwrap(), &vec!["val".to_string()]);
537            }
538            other => panic!("expected ParsedBody::Form, got {:?}", other),
539        }
540    }
541
542    #[test]
543    fn test_form_urlencoded_empty_pairs_skipped() {
544        let inspector = BodyInspector::default();
545        // Trailing & and double && should be skipped
546        let body = b"a=1&&b=2&";
547        let result = inspector
548            .inspect(body, Some("application/x-www-form-urlencoded"))
549            .unwrap();
550
551        assert!(result.parse_success);
552        match result.parsed_structure.unwrap() {
553            ParsedBody::Form(form) => {
554                assert_eq!(form.len(), 2);
555                assert_eq!(form.get("a").unwrap(), &vec!["1".to_string()]);
556                assert_eq!(form.get("b").unwrap(), &vec!["2".to_string()]);
557            }
558            other => panic!("expected ParsedBody::Form, got {:?}", other),
559        }
560    }
561
562    #[test]
563    fn test_json_mixed_depth_limit() {
564        let mut config = BodyConfig::default();
565        config.max_parse_depth = 3;
566        let inspector = BodyInspector::new(config);
567
568        // Mix of arrays and objects at depth 3 - within limit
569        let body = br#"{"arr": [{"key": "value"}]}"#;
570        let result = inspector.inspect(body, Some("application/json")).unwrap();
571        assert!(result.parse_success);
572    }
573}