1pub mod config;
29pub mod error;
30pub mod icmp_state;
31pub mod key;
32pub mod state;
33pub mod table;
34pub mod tcp_reassembly;
35pub mod tcp_state;
36pub mod udp_state;
37
38pub use config::FlowConfig;
40pub use error::FlowError;
41pub use icmp_state::IcmpFlowState;
42pub use key::{
43 CanonicalKey, FlowDirection, TransportProtocol, ZWaveKey, extract_key, extract_zwave_key,
44};
45pub use state::{
46 ConversationState, ConversationStatus, DirectionStats, ProtocolState, ZWaveFlowState,
47};
48pub use table::ConversationTable;
49pub use tcp_reassembly::{ReassemblyAction, TcpReassembler};
50pub use tcp_state::{TcpConnectionState, TcpConversationState, TcpEndpointState};
51pub use udp_state::UdpFlowState;
52
53use std::collections::HashMap;
54
55use crate::layer::LayerKind;
56use crate::pcap::CapturedPacket;
57
58pub fn extract_flows(packets: &[CapturedPacket]) -> Result<Vec<ConversationState>, FlowError> {
67 extract_flows_with_config(packets, FlowConfig::default())
68}
69
70pub fn extract_flows_with_config(
72 packets: &[CapturedPacket],
73 config: FlowConfig,
74) -> Result<Vec<ConversationState>, FlowError> {
75 let table = ConversationTable::new(config);
76
77 for (index, captured) in packets.iter().enumerate() {
78 let timestamp = captured.metadata.timestamp;
79 table.ingest_packet(&captured.packet, timestamp, index)?;
80 }
81
82 Ok(table.into_conversations())
83}
84
85pub fn extract_zwave_flows(
93 packets: &[CapturedPacket],
94) -> Result<Vec<ConversationState>, FlowError> {
95 let mut conversations: HashMap<ZWaveKey, ConversationState> = HashMap::new();
96
97 for (index, captured) in packets.iter().enumerate() {
98 let timestamp = captured.metadata.timestamp;
99 let packet = &captured.packet;
100
101 if packet.get_layer(LayerKind::ZWave).is_none() {
103 continue;
104 }
105
106 let (key, direction) = match extract_zwave_key(packet) {
107 Ok(result) => result,
108 Err(_) => continue,
109 };
110
111 let byte_count = packet.as_bytes().len() as u64;
112 let buf = packet.as_bytes();
113
114 let conv = conversations.entry(key.clone()).or_insert_with(|| {
115 let mut state = ConversationState::new_zwave(key, timestamp);
116 if let ProtocolState::ZWave(ref mut zw) = state.protocol_state
117 && let Some(zwave) = packet.zwave()
118 {
119 zw.home_id = zwave.home_id(buf).unwrap_or(0);
120 }
121 state
122 });
123
124 conv.record_packet(direction, byte_count, timestamp, index, false, false);
125
126 if let ProtocolState::ZWave(ref mut zw) = conv.protocol_state
128 && let Some(zwave) = packet.zwave()
129 {
130 if zwave.is_ack(buf) {
131 zw.ack_count += 1;
132 } else {
133 zw.command_count += 1;
134 }
135 }
136 }
137
138 let mut result: Vec<ConversationState> = conversations.into_values().collect();
139 result.sort_by_key(|c| c.start_time);
140 Ok(result)
141}
142
143#[cfg(test)]
144mod tests {
145 use super::*;
146 use crate::layer::stack::{LayerStack, LayerStackEntry};
147 use crate::pcap::PcapMetadata;
148 use crate::{EthernetBuilder, Ipv4Builder, MacAddress, Packet, TcpBuilder, UdpBuilder};
149 use std::net::Ipv4Addr;
150 use std::time::Duration;
151
152 fn make_captured(packet: Packet, timestamp_secs: u64) -> CapturedPacket {
153 CapturedPacket {
154 packet,
155 metadata: PcapMetadata {
156 timestamp: Duration::from_secs(timestamp_secs),
157 orig_len: 0,
158 },
159 }
160 }
161
162 fn tcp_packet(
163 src_ip: Ipv4Addr,
164 dst_ip: Ipv4Addr,
165 sport: u16,
166 dport: u16,
167 flags: &str,
168 ) -> Packet {
169 let mut builder = TcpBuilder::new()
170 .src_port(sport)
171 .dst_port(dport)
172 .seq(1000)
173 .ack_num(0)
174 .window(65535);
175
176 for c in flags.chars() {
177 builder = match c {
178 'S' => builder.syn(),
179 'A' => builder.ack(),
180 'F' => builder.fin(),
181 'R' => builder.rst(),
182 _ => builder,
183 };
184 }
185
186 LayerStack::new()
187 .push(LayerStackEntry::Ethernet(
188 EthernetBuilder::new()
189 .dst(MacAddress::BROADCAST)
190 .src(MacAddress::new([0, 1, 2, 3, 4, 5])),
191 ))
192 .push(LayerStackEntry::Ipv4(
193 Ipv4Builder::new().src(src_ip).dst(dst_ip),
194 ))
195 .push(LayerStackEntry::Tcp(builder))
196 .build_packet()
197 }
198
199 fn udp_packet(src_ip: Ipv4Addr, dst_ip: Ipv4Addr, sport: u16, dport: u16) -> Packet {
200 LayerStack::new()
201 .push(LayerStackEntry::Ethernet(
202 EthernetBuilder::new()
203 .dst(MacAddress::BROADCAST)
204 .src(MacAddress::new([0, 1, 2, 3, 4, 5])),
205 ))
206 .push(LayerStackEntry::Ipv4(
207 Ipv4Builder::new().src(src_ip).dst(dst_ip),
208 ))
209 .push(LayerStackEntry::Udp(
210 UdpBuilder::new().src_port(sport).dst_port(dport),
211 ))
212 .build_packet()
213 }
214
215 #[test]
216 fn test_extract_flows_empty() {
217 let result = extract_flows(&[]).unwrap();
218 assert!(result.is_empty());
219 }
220
221 #[test]
222 fn test_extract_flows_single_tcp() {
223 let packets = vec![
224 make_captured(
225 tcp_packet(
226 Ipv4Addr::new(10, 0, 0, 1),
227 Ipv4Addr::new(10, 0, 0, 2),
228 12345,
229 80,
230 "S",
231 ),
232 1,
233 ),
234 make_captured(
235 tcp_packet(
236 Ipv4Addr::new(10, 0, 0, 2),
237 Ipv4Addr::new(10, 0, 0, 1),
238 80,
239 12345,
240 "SA",
241 ),
242 2,
243 ),
244 ];
245
246 let conversations = extract_flows(&packets).unwrap();
247 assert_eq!(conversations.len(), 1);
248 assert_eq!(conversations[0].total_packets(), 2);
249 assert_eq!(conversations[0].key.protocol, TransportProtocol::Tcp);
250 }
251
252 #[test]
253 fn test_extract_flows_multiple_conversations() {
254 let packets = vec![
255 make_captured(
256 tcp_packet(
257 Ipv4Addr::new(10, 0, 0, 1),
258 Ipv4Addr::new(10, 0, 0, 2),
259 12345,
260 80,
261 "S",
262 ),
263 1,
264 ),
265 make_captured(
266 udp_packet(
267 Ipv4Addr::new(10, 0, 0, 1),
268 Ipv4Addr::new(10, 0, 0, 3),
269 54321,
270 53,
271 ),
272 2,
273 ),
274 make_captured(
275 tcp_packet(
276 Ipv4Addr::new(10, 0, 0, 2),
277 Ipv4Addr::new(10, 0, 0, 1),
278 80,
279 12345,
280 "SA",
281 ),
282 3,
283 ),
284 ];
285
286 let conversations = extract_flows(&packets).unwrap();
287 assert_eq!(conversations.len(), 2);
288 assert!(conversations[0].start_time <= conversations[1].start_time);
290 }
291
292 #[test]
293 fn test_extract_flows_preserves_packet_indices() {
294 let packets = vec![
295 make_captured(
296 tcp_packet(
297 Ipv4Addr::new(10, 0, 0, 1),
298 Ipv4Addr::new(10, 0, 0, 2),
299 12345,
300 80,
301 "S",
302 ),
303 1,
304 ),
305 make_captured(
306 tcp_packet(
307 Ipv4Addr::new(10, 0, 0, 2),
308 Ipv4Addr::new(10, 0, 0, 1),
309 80,
310 12345,
311 "SA",
312 ),
313 2,
314 ),
315 ];
316
317 let conversations = extract_flows(&packets).unwrap();
318 assert_eq!(conversations[0].packet_indices, vec![0, 1]);
319 }
320}