pcapsql_core/stream/
mod.rs

1//! TCP stream processing and application-layer parsing.
2//!
3//! This module provides TCP connection tracking and stream reassembly,
4//! enabling parsing of application-layer protocols that span multiple packets.
5//!
6//! ## Components
7//!
8//! - [`ConnectionTracker`] - Tracks TCP connections via 3-way handshake
9//! - [`TcpReassembler`] - Reassembles TCP segments into ordered byte streams
10//! - [`StreamManager`] - Coordinates tracking, reassembly, and parsing
11//! - [`StreamParser`] - Trait for application-layer parsers (HTTP, TLS)
12//! - [`StreamRegistry`] - Registry of stream parsers
13//!
14//! ## Built-in Stream Parsers
15//!
16//! - HTTP (request/response parsing)
17//! - TLS (handshake analysis, SNI extraction)
18//!
19//! ## Example
20//!
21//! ```rust,no_run
22//! use pcapsql_core::stream::{StreamManager, StreamConfig};
23//!
24//! let config = StreamConfig::default();
25//! let mut manager = StreamManager::new(config);
26//!
27//! // Process TCP segments from packets...
28//! // manager.process_segment(...);
29//! ```
30
31mod connection;
32mod context;
33mod manager;
34mod parser;
35pub mod parsers;
36mod reassembly;
37mod registry;
38
39pub use connection::{Connection, ConnectionKey, ConnectionState, ConnectionTracker, TcpFlags};
40pub use context::{Direction, ParsedMessage, StreamContext, StreamParseResult};
41pub use manager::{StreamConfig, StreamManager};
42pub use parser::StreamParser;
43pub use parsers::{
44    DecryptingTlsStreamParser, Http2StreamParser, HttpStreamParser, TlsStreamParser,
45};
46pub use reassembly::{Segment, SequenceGap, StreamBuffer, StreamKey, StreamStats, TcpReassembler};
47pub use registry::StreamRegistry;
48
49#[cfg(test)]
50mod tests {
51    use super::*;
52    use crate::schema::FieldDescriptor;
53    use std::net::{IpAddr, Ipv4Addr};
54
55    // Test 1: StreamContext construction
56    #[test]
57    fn test_stream_context_new() {
58        let ctx = StreamContext {
59            connection_id: 1,
60            direction: Direction::ToServer,
61            src_ip: IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
62            dst_ip: IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)),
63            src_port: 54321,
64            dst_port: 80,
65            bytes_parsed: 0,
66            messages_parsed: 0,
67            alpn: None,
68        };
69        assert_eq!(ctx.connection_id, 1);
70        assert_eq!(ctx.direction, Direction::ToServer);
71    }
72
73    // Test 2: StreamParseResult variants
74    #[test]
75    fn test_parse_result_complete() {
76        let result = StreamParseResult::Complete {
77            messages: vec![],
78            bytes_consumed: 100,
79        };
80        match result {
81            StreamParseResult::Complete { bytes_consumed, .. } => {
82                assert_eq!(bytes_consumed, 100);
83            }
84            _ => panic!("Expected Complete"),
85        }
86    }
87
88    #[test]
89    fn test_parse_result_need_more() {
90        let result = StreamParseResult::NeedMore {
91            minimum_bytes: Some(50),
92        };
93        match result {
94            StreamParseResult::NeedMore { minimum_bytes } => {
95                assert_eq!(minimum_bytes, Some(50));
96            }
97            _ => panic!("Expected NeedMore"),
98        }
99    }
100
101    // Test 3: StreamRegistry registration
102    // (Need a mock parser for this)
103    struct MockParser;
104    impl StreamParser for MockParser {
105        fn name(&self) -> &'static str {
106            "mock"
107        }
108        fn can_parse_stream(&self, _: &StreamContext) -> bool {
109            true
110        }
111        fn parse_stream(&self, _: &[u8], _: &StreamContext) -> StreamParseResult {
112            StreamParseResult::NotThisProtocol
113        }
114        fn message_schema(&self) -> Vec<FieldDescriptor> {
115            vec![]
116        }
117    }
118
119    #[test]
120    fn test_registry_register() {
121        let mut registry = StreamRegistry::new();
122        registry.register(MockParser);
123        assert_eq!(registry.parser_names(), vec!["mock"]);
124    }
125
126    // Test 4: StreamRegistry find_parser
127    #[test]
128    fn test_registry_find_parser() {
129        let mut registry = StreamRegistry::new();
130        registry.register(MockParser);
131
132        let ctx = StreamContext {
133            connection_id: 1,
134            direction: Direction::ToServer,
135            src_ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
136            dst_ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
137            src_port: 1234,
138            dst_port: 80,
139            bytes_parsed: 0,
140            messages_parsed: 0,
141            alpn: None,
142        };
143
144        let parser = registry.find_parser(&ctx);
145        assert!(parser.is_some());
146        assert_eq!(parser.unwrap().name(), "mock");
147    }
148
149    // Test 5: StreamRegistry get_parser by name
150    #[test]
151    fn test_registry_get_parser() {
152        let mut registry = StreamRegistry::new();
153        registry.register(MockParser);
154
155        assert!(registry.get_parser("mock").is_some());
156        assert!(registry.get_parser("nonexistent").is_none());
157    }
158
159    // Test 6: Empty registry behavior
160    #[test]
161    fn test_registry_empty() {
162        let registry = StreamRegistry::new();
163        assert!(registry.parser_names().is_empty());
164
165        let ctx = StreamContext {
166            connection_id: 1,
167            direction: Direction::ToServer,
168            src_ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
169            dst_ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
170            src_port: 1234,
171            dst_port: 80,
172            bytes_parsed: 0,
173            messages_parsed: 0,
174            alpn: None,
175        };
176        assert!(registry.find_parser(&ctx).is_none());
177    }
178}