Skip to main content

hyperi_rustlib/transport/
detect.rs

1// Project:   hyperi-rustlib
2// File:      src/transport/detect.rs
3// Purpose:   Stateful payload format detection with auto-locking
4// Language:  Rust
5//
6// License:   FSL-1.1-ALv2
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! # Format Detection
10//!
11//! Stateful payload format detection that auto-locks to the first detected format.
12//! Mismatched formats can be sent to DLQ, or the detector can auto-reset after
13//! sustained mismatches.
14//!
15//! ## Modes
16//!
17//! - **Auto** (default): Detect format from first message, lock it
18//! - **ForceJson**: Only accept JSON, reject MessagePack
19//! - **ForceMessagePack**: Only accept MessagePack, reject JSON
20//!
21//! ## Example
22//!
23//! ```rust
24//! use hyperi_rustlib::transport::{FormatDetector, FormatMode, DetectedFormat};
25//!
26//! let detector = FormatDetector::new();
27//!
28//! // First message sets the format
29//! let result = detector.check_and_detect(br#"{"event": "login"}"#);
30//! assert!(result.is_ok());
31//! assert_eq!(detector.format(), DetectedFormat::Json);
32//!
33//! // Subsequent messages must match
34//! let result = detector.check_and_detect(br#"{"event": "logout"}"#);
35//! assert!(result.is_ok());
36//!
37//! // Mismatched format returns Err (send to DLQ)
38//! let msgpack = [0x81, 0xa3, b'f', b'o', b'o'];
39//! let result = detector.check_and_detect(&msgpack);
40//! assert!(result.is_err());
41//! ```
42
43use std::sync::atomic::{AtomicU8, Ordering};
44
45/// Detected payload format (for stateful detection).
46///
47/// Separate from `PayloadFormat` which includes `Auto` for config purposes.
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
49#[repr(u8)]
50pub enum DetectedFormat {
51    /// Format not yet detected
52    Unknown = 0,
53    /// JSON format
54    Json = 1,
55    /// MessagePack format
56    MessagePack = 2,
57}
58
59impl From<u8> for DetectedFormat {
60    fn from(v: u8) -> Self {
61        match v {
62            1 => DetectedFormat::Json,
63            2 => DetectedFormat::MessagePack,
64            _ => DetectedFormat::Unknown,
65        }
66    }
67}
68
69/// Format detection mode.
70#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
71pub enum FormatMode {
72    /// Auto-detect format from first message (default)
73    #[default]
74    Auto,
75    /// Force JSON only - reject MessagePack
76    ForceJson,
77    /// Force MessagePack only - reject JSON
78    ForceMessagePack,
79}
80
81impl FormatMode {
82    /// Parse from string (for config).
83    #[must_use]
84    pub fn parse(s: &str) -> Option<Self> {
85        match s.to_lowercase().as_str() {
86            "auto" => Some(FormatMode::Auto),
87            "json" => Some(FormatMode::ForceJson),
88            "messagepack" | "msgpack" => Some(FormatMode::ForceMessagePack),
89            _ => None,
90        }
91    }
92}
93
94/// Stateful format detector with auto-detection and locking.
95///
96/// Once a format is detected, the detector locks to that format.
97/// Mismatches return `Err` (for DLQ routing). After sustained mismatches
98/// (configurable threshold), the detector auto-resets in Auto mode.
99pub struct FormatDetector {
100    detected_format: AtomicU8,
101    mismatch_count: AtomicU8,
102    mode: FormatMode,
103}
104
105impl FormatDetector {
106    /// Threshold of consecutive mismatches before considering format reset (Auto mode only)
107    const MISMATCH_THRESHOLD: u8 = 10;
108
109    /// Create a new detector in Auto mode.
110    #[must_use]
111    pub const fn new() -> Self {
112        Self {
113            detected_format: AtomicU8::new(DetectedFormat::Unknown as u8),
114            mismatch_count: AtomicU8::new(0),
115            mode: FormatMode::Auto,
116        }
117    }
118
119    /// Create a detector with a specific mode.
120    #[must_use]
121    pub fn with_mode(mode: FormatMode) -> Self {
122        let initial_format = match mode {
123            FormatMode::Auto => DetectedFormat::Unknown,
124            FormatMode::ForceJson => DetectedFormat::Json,
125            FormatMode::ForceMessagePack => DetectedFormat::MessagePack,
126        };
127        Self {
128            detected_format: AtomicU8::new(initial_format as u8),
129            mismatch_count: AtomicU8::new(0),
130            mode,
131        }
132    }
133
134    /// Get the current mode.
135    #[must_use]
136    pub fn mode(&self) -> FormatMode {
137        self.mode
138    }
139
140    /// Get the currently detected format.
141    #[must_use]
142    pub fn format(&self) -> DetectedFormat {
143        DetectedFormat::from(self.detected_format.load(Ordering::Relaxed))
144    }
145
146    /// Check if format matches expected, tracking mismatches.
147    ///
148    /// Returns `Ok(format)` if message should be processed, `Err(expected)` if it
149    /// should go to DLQ (expected format returned for error context).
150    #[inline]
151    pub fn check_and_detect(&self, payload: &[u8]) -> Result<DetectedFormat, DetectedFormat> {
152        let detected = detect_format_bytes(payload);
153
154        // Handle forced modes - no auto-detection, no reset
155        match self.mode {
156            FormatMode::ForceJson => {
157                return match detected {
158                    Some(DetectedFormat::Json) => Ok(DetectedFormat::Json),
159                    _ => Err(DetectedFormat::Json), // Expected JSON, got something else -> DLQ
160                };
161            }
162            FormatMode::ForceMessagePack => {
163                return match detected {
164                    Some(DetectedFormat::MessagePack) => Ok(DetectedFormat::MessagePack),
165                    _ => Err(DetectedFormat::MessagePack), // Expected MsgPack, got something else -> DLQ
166                };
167            }
168            FormatMode::Auto => {} // Continue with auto-detection logic
169        }
170
171        // Auto mode logic
172        let current = self.format();
173
174        match (current, detected) {
175            // First message - set the format
176            (DetectedFormat::Unknown, Some(fmt)) => {
177                self.detected_format.store(fmt as u8, Ordering::Relaxed);
178                self.mismatch_count.store(0, Ordering::Relaxed);
179                Ok(fmt)
180            }
181
182            // Unknown format in payload - DLQ
183            (_, None) => Err(DetectedFormat::Unknown),
184
185            // Format matches - process
186            (expected, Some(actual)) if expected == actual => {
187                self.mismatch_count.store(0, Ordering::Relaxed);
188                Ok(actual)
189            }
190
191            // Format mismatch - check if we should reset
192            (expected, Some(actual)) => {
193                let count = self.mismatch_count.fetch_add(1, Ordering::Relaxed);
194                if count >= Self::MISMATCH_THRESHOLD {
195                    // Too many mismatches - assume format changed, reset
196                    self.detected_format.store(actual as u8, Ordering::Relaxed);
197                    self.mismatch_count.store(0, Ordering::Relaxed);
198                    #[cfg(feature = "logger")]
199                    tracing::warn!(
200                        old = ?expected,
201                        new = ?actual,
202                        "Format changed after {} mismatches, resetting",
203                        count
204                    );
205                    Ok(actual)
206                } else {
207                    // Mismatch - send to DLQ
208                    Err(expected)
209                }
210            }
211        }
212    }
213
214    /// Force reset to unknown (for testing or manual override, Auto mode only).
215    pub fn reset(&self) {
216        if self.mode == FormatMode::Auto {
217            self.detected_format
218                .store(DetectedFormat::Unknown as u8, Ordering::Relaxed);
219            self.mismatch_count.store(0, Ordering::Relaxed);
220        }
221    }
222}
223
224impl Default for FormatDetector {
225    fn default() -> Self {
226        Self::new()
227    }
228}
229
230/// Detect payload format from raw bytes (internal).
231///
232/// Optimized for the common case where JSON starts with '{' at position 0.
233#[inline]
234fn detect_format_bytes(payload: &[u8]) -> Option<DetectedFormat> {
235    // Fast path: check first byte directly (common case - no leading whitespace)
236    let first_byte = *payload.first()?;
237
238    // Most common case: JSON object starting with '{'
239    if first_byte == b'{' || first_byte == b'[' {
240        return Some(DetectedFormat::Json);
241    }
242
243    // Check for MessagePack before considering whitespace
244    // (MessagePack never starts with whitespace-like bytes)
245    // MessagePack: fixmap (0x80-0x8F), map16/32 (0xDE/0xDF), fixarray (0x90-0x9F), array16/32 (0xDC/0xDD)
246    if matches!(first_byte, 0x80..=0x8F | 0xDE | 0xDF | 0x90..=0x9F | 0xDC | 0xDD) {
247        return Some(DetectedFormat::MessagePack);
248    }
249
250    // Slow path: skip leading whitespace for JSON (rare case)
251    if first_byte.is_ascii_whitespace() {
252        for &b in payload.iter().skip(1) {
253            if !b.is_ascii_whitespace() {
254                return match b {
255                    b'{' | b'[' => Some(DetectedFormat::Json),
256                    _ => None,
257                };
258            }
259        }
260        return None; // All whitespace
261    }
262
263    None
264}
265
266/// Stateless format detection (convenience function).
267///
268/// For stateful detection with locking, use `FormatDetector`.
269#[inline]
270#[must_use]
271pub fn detect_format(payload: &[u8]) -> Option<DetectedFormat> {
272    detect_format_bytes(payload)
273}
274
275#[cfg(test)]
276mod tests {
277    use super::*;
278
279    #[test]
280    fn test_detect_json_object() {
281        assert_eq!(
282            detect_format(b"{\"key\": \"value\"}"),
283            Some(DetectedFormat::Json)
284        );
285    }
286
287    #[test]
288    fn test_detect_json_array() {
289        assert_eq!(detect_format(b"[1, 2, 3]"), Some(DetectedFormat::Json));
290    }
291
292    #[test]
293    fn test_detect_json_with_whitespace() {
294        assert_eq!(
295            detect_format(b"  \n\t{\"key\": 1}"),
296            Some(DetectedFormat::Json)
297        );
298    }
299
300    #[test]
301    fn test_detect_msgpack_fixmap() {
302        assert_eq!(
303            detect_format(&[0x81, 0xA3, b'k', b'e', b'y']),
304            Some(DetectedFormat::MessagePack)
305        );
306    }
307
308    #[test]
309    fn test_detect_msgpack_map16() {
310        assert_eq!(
311            detect_format(&[0xDE, 0x00, 0x01]),
312            Some(DetectedFormat::MessagePack)
313        );
314    }
315
316    #[test]
317    fn test_detect_empty() {
318        assert_eq!(detect_format(b""), None);
319    }
320
321    #[test]
322    fn test_detect_whitespace_only() {
323        assert_eq!(detect_format(b"   \n\t  "), None);
324    }
325
326    #[test]
327    fn test_detect_unknown() {
328        assert_eq!(detect_format(b"hello"), None);
329    }
330
331    #[test]
332    fn test_format_detector_auto_detect() {
333        let detector = FormatDetector::new();
334        assert_eq!(detector.format(), DetectedFormat::Unknown);
335
336        // First JSON message sets format
337        let result = detector.check_and_detect(b"{\"key\": 1}");
338        assert_eq!(result, Ok(DetectedFormat::Json));
339        assert_eq!(detector.format(), DetectedFormat::Json);
340
341        // Subsequent JSON messages pass
342        assert_eq!(
343            detector.check_and_detect(b"{\"key\": 2}"),
344            Ok(DetectedFormat::Json)
345        );
346
347        // MessagePack mismatch goes to DLQ
348        assert_eq!(
349            detector.check_and_detect(&[0x81, 0xA1, b'k']),
350            Err(DetectedFormat::Json)
351        );
352    }
353
354    #[test]
355    fn test_format_detector_mismatch_reset() {
356        let detector = FormatDetector::new();
357
358        // Set to JSON
359        detector.check_and_detect(b"{\"key\": 1}").unwrap();
360
361        // Send 11 MessagePack messages (> threshold of 10)
362        for _ in 0..11 {
363            let _ = detector.check_and_detect(&[0x81, 0xA1, b'k']);
364        }
365
366        // Format should have switched to MessagePack
367        assert_eq!(detector.format(), DetectedFormat::MessagePack);
368    }
369
370    #[test]
371    fn test_force_json_mode() {
372        let detector = FormatDetector::with_mode(FormatMode::ForceJson);
373        assert_eq!(detector.mode(), FormatMode::ForceJson);
374        assert_eq!(detector.format(), DetectedFormat::Json);
375
376        // JSON passes
377        assert_eq!(
378            detector.check_and_detect(b"{\"key\": 1}"),
379            Ok(DetectedFormat::Json)
380        );
381
382        // MessagePack fails immediately (no mismatch counting)
383        assert_eq!(
384            detector.check_and_detect(&[0x81, 0xA1, b'k']),
385            Err(DetectedFormat::Json)
386        );
387
388        // Unknown format also fails
389        assert_eq!(
390            detector.check_and_detect(b"hello"),
391            Err(DetectedFormat::Json)
392        );
393
394        // Format stays locked
395        assert_eq!(detector.format(), DetectedFormat::Json);
396    }
397
398    #[test]
399    fn test_force_msgpack_mode() {
400        let detector = FormatDetector::with_mode(FormatMode::ForceMessagePack);
401        assert_eq!(detector.mode(), FormatMode::ForceMessagePack);
402        assert_eq!(detector.format(), DetectedFormat::MessagePack);
403
404        // MessagePack passes
405        assert_eq!(
406            detector.check_and_detect(&[0x81, 0xA1, b'k']),
407            Ok(DetectedFormat::MessagePack)
408        );
409
410        // JSON fails immediately
411        assert_eq!(
412            detector.check_and_detect(b"{\"key\": 1}"),
413            Err(DetectedFormat::MessagePack)
414        );
415
416        // Format stays locked
417        assert_eq!(detector.format(), DetectedFormat::MessagePack);
418    }
419
420    #[test]
421    fn test_force_mode_no_reset() {
422        let detector = FormatDetector::with_mode(FormatMode::ForceJson);
423
424        // Send many MessagePack messages - should NOT reset
425        for _ in 0..20 {
426            let _ = detector.check_and_detect(&[0x81, 0xA1, b'k']);
427        }
428
429        // Format should still be JSON (no auto-reset in force mode)
430        assert_eq!(detector.format(), DetectedFormat::Json);
431    }
432
433    #[test]
434    fn test_format_mode_from_str() {
435        assert_eq!(FormatMode::parse("auto"), Some(FormatMode::Auto));
436        assert_eq!(FormatMode::parse("AUTO"), Some(FormatMode::Auto));
437        assert_eq!(FormatMode::parse("json"), Some(FormatMode::ForceJson));
438        assert_eq!(FormatMode::parse("JSON"), Some(FormatMode::ForceJson));
439        assert_eq!(
440            FormatMode::parse("messagepack"),
441            Some(FormatMode::ForceMessagePack)
442        );
443        assert_eq!(
444            FormatMode::parse("msgpack"),
445            Some(FormatMode::ForceMessagePack)
446        );
447        assert_eq!(FormatMode::parse("invalid"), None);
448    }
449}