pcapsql_core/stream/
mod.rs1mod connection;
32mod context;
33mod manager;
34mod parser;
35pub mod parsers;
36mod reassembly;
37mod registry;
38
39pub use connection::{Connection, ConnectionKey, ConnectionState, ConnectionTracker, TcpFlags};
40pub use context::{Direction, ParsedMessage, StreamContext, StreamParseResult};
41pub use manager::{StreamConfig, StreamManager};
42pub use parser::StreamParser;
43pub use parsers::{
44 DecryptingTlsStreamParser, Http2StreamParser, HttpStreamParser, TlsStreamParser,
45};
46pub use reassembly::{Segment, SequenceGap, StreamBuffer, StreamKey, StreamStats, TcpReassembler};
47pub use registry::StreamRegistry;
48
49#[cfg(test)]
50mod tests {
51 use super::*;
52 use crate::schema::FieldDescriptor;
53 use std::net::{IpAddr, Ipv4Addr};
54
55 #[test]
57 fn test_stream_context_new() {
58 let ctx = StreamContext {
59 connection_id: 1,
60 direction: Direction::ToServer,
61 src_ip: IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
62 dst_ip: IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)),
63 src_port: 54321,
64 dst_port: 80,
65 bytes_parsed: 0,
66 messages_parsed: 0,
67 alpn: None,
68 };
69 assert_eq!(ctx.connection_id, 1);
70 assert_eq!(ctx.direction, Direction::ToServer);
71 }
72
73 #[test]
75 fn test_parse_result_complete() {
76 let result = StreamParseResult::Complete {
77 messages: vec![],
78 bytes_consumed: 100,
79 };
80 match result {
81 StreamParseResult::Complete { bytes_consumed, .. } => {
82 assert_eq!(bytes_consumed, 100);
83 }
84 _ => panic!("Expected Complete"),
85 }
86 }
87
88 #[test]
89 fn test_parse_result_need_more() {
90 let result = StreamParseResult::NeedMore {
91 minimum_bytes: Some(50),
92 };
93 match result {
94 StreamParseResult::NeedMore { minimum_bytes } => {
95 assert_eq!(minimum_bytes, Some(50));
96 }
97 _ => panic!("Expected NeedMore"),
98 }
99 }
100
101 struct MockParser;
104 impl StreamParser for MockParser {
105 fn name(&self) -> &'static str {
106 "mock"
107 }
108 fn can_parse_stream(&self, _: &StreamContext) -> bool {
109 true
110 }
111 fn parse_stream(&self, _: &[u8], _: &StreamContext) -> StreamParseResult {
112 StreamParseResult::NotThisProtocol
113 }
114 fn message_schema(&self) -> Vec<FieldDescriptor> {
115 vec![]
116 }
117 }
118
119 #[test]
120 fn test_registry_register() {
121 let mut registry = StreamRegistry::new();
122 registry.register(MockParser);
123 assert_eq!(registry.parser_names(), vec!["mock"]);
124 }
125
126 #[test]
128 fn test_registry_find_parser() {
129 let mut registry = StreamRegistry::new();
130 registry.register(MockParser);
131
132 let ctx = StreamContext {
133 connection_id: 1,
134 direction: Direction::ToServer,
135 src_ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
136 dst_ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
137 src_port: 1234,
138 dst_port: 80,
139 bytes_parsed: 0,
140 messages_parsed: 0,
141 alpn: None,
142 };
143
144 let parser = registry.find_parser(&ctx);
145 assert!(parser.is_some());
146 assert_eq!(parser.unwrap().name(), "mock");
147 }
148
149 #[test]
151 fn test_registry_get_parser() {
152 let mut registry = StreamRegistry::new();
153 registry.register(MockParser);
154
155 assert!(registry.get_parser("mock").is_some());
156 assert!(registry.get_parser("nonexistent").is_none());
157 }
158
159 #[test]
161 fn test_registry_empty() {
162 let registry = StreamRegistry::new();
163 assert!(registry.parser_names().is_empty());
164
165 let ctx = StreamContext {
166 connection_id: 1,
167 direction: Direction::ToServer,
168 src_ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
169 dst_ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
170 src_port: 1234,
171 dst_port: 80,
172 bytes_parsed: 0,
173 messages_parsed: 0,
174 alpn: None,
175 };
176 assert!(registry.find_parser(&ctx).is_none());
177 }
178}