1pub mod config;
29pub mod error;
30pub mod icmp_state;
31pub mod key;
32pub mod spill;
33pub mod state;
34pub mod table;
35pub mod tcp_reassembly;
36pub mod tcp_state;
37pub mod udp_state;
38
39pub use config::FlowConfig;
41pub use error::FlowError;
42pub use icmp_state::IcmpFlowState;
43pub use key::{
44 CanonicalKey, FlowDirection, TransportProtocol, ZWaveKey, extract_key, extract_zwave_key,
45};
46pub use state::{
47 ConversationState, ConversationStatus, DirectionStats, ProtocolState, ZWaveFlowState,
48};
49pub use table::ConversationTable;
50pub use tcp_reassembly::{ReassemblyAction, TcpReassembler};
51pub use tcp_state::{TcpConnectionState, TcpConversationState, TcpEndpointState};
52pub use udp_state::UdpFlowState;
53
54use std::collections::HashMap;
55use std::path::Path;
56
57use crate::error::PacketError;
58use crate::layer::LayerKind;
59use crate::pcap::{CaptureIterator, CapturedPacket};
60
61pub fn extract_flows(packets: &[CapturedPacket]) -> Result<Vec<ConversationState>, FlowError> {
70 extract_flows_with_config(packets, FlowConfig::default())
71}
72
73pub fn extract_flows_with_config(
75 packets: &[CapturedPacket],
76 config: FlowConfig,
77) -> Result<Vec<ConversationState>, FlowError> {
78 let table = ConversationTable::new(config);
79
80 for (index, captured) in packets.iter().enumerate() {
81 let timestamp = captured.metadata.timestamp;
82 table.ingest_packet(&captured.packet, timestamp, index)?;
83 }
84
85 Ok(table.into_conversations())
86}
87
88pub fn extract_flows_streaming<I>(
97 packets: I,
98 config: FlowConfig,
99) -> Result<Vec<ConversationState>, FlowError>
100where
101 I: Iterator<Item = Result<CapturedPacket, PacketError>>,
102{
103 let table = ConversationTable::new(config);
104
105 for (index, result) in packets.enumerate() {
106 let captured = result.map_err(FlowError::PacketError)?;
107 let timestamp = captured.metadata.timestamp;
108 table.ingest_packet(&captured.packet, timestamp, index)?;
109 }
111
112 Ok(table.into_conversations())
113}
114
115pub fn extract_flows_from_file(
120 path: impl AsRef<Path>,
121 config: FlowConfig,
122) -> Result<Vec<ConversationState>, FlowError> {
123 let iter = CaptureIterator::open(path).map_err(FlowError::PacketError)?;
124 extract_flows_streaming(iter, config)
125}
126
127pub fn extract_zwave_flows(
135 packets: &[CapturedPacket],
136) -> Result<Vec<ConversationState>, FlowError> {
137 let mut conversations: HashMap<ZWaveKey, ConversationState> = HashMap::new();
138
139 for (index, captured) in packets.iter().enumerate() {
140 let timestamp = captured.metadata.timestamp;
141 let packet = &captured.packet;
142
143 if packet.get_layer(LayerKind::ZWave).is_none() {
145 continue;
146 }
147
148 let (key, direction) = match extract_zwave_key(packet) {
149 Ok(result) => result,
150 Err(_) => continue,
151 };
152
153 let byte_count = packet.as_bytes().len() as u64;
154 let buf = packet.as_bytes();
155
156 let conv = conversations.entry(key.clone()).or_insert_with(|| {
157 let mut state = ConversationState::new_zwave(key, timestamp);
158 if let ProtocolState::ZWave(ref mut zw) = state.protocol_state
159 && let Some(zwave) = packet.zwave()
160 {
161 zw.home_id = zwave.home_id(buf).unwrap_or(0);
162 }
163 state
164 });
165
166 conv.record_packet(direction, byte_count, timestamp, index, false, false);
167
168 if let ProtocolState::ZWave(ref mut zw) = conv.protocol_state
170 && let Some(zwave) = packet.zwave()
171 {
172 if zwave.is_ack(buf) {
173 zw.ack_count += 1;
174 } else {
175 zw.command_count += 1;
176 }
177 }
178 }
179
180 let mut result: Vec<ConversationState> = conversations.into_values().collect();
181 result.sort_by_key(|c| c.start_time);
182 Ok(result)
183}
184
185#[cfg(test)]
186mod tests {
187 use super::*;
188 use crate::layer::stack::{LayerStack, LayerStackEntry};
189 use crate::pcap::PcapMetadata;
190 use crate::{EthernetBuilder, Ipv4Builder, MacAddress, Packet, TcpBuilder, UdpBuilder};
191 use std::net::Ipv4Addr;
192 use std::time::Duration;
193
194 fn make_captured(packet: Packet, timestamp_secs: u64) -> CapturedPacket {
195 CapturedPacket {
196 packet,
197 metadata: PcapMetadata {
198 timestamp: Duration::from_secs(timestamp_secs),
199 orig_len: 0,
200 ..Default::default()
201 },
202 }
203 }
204
205 fn tcp_packet(
206 src_ip: Ipv4Addr,
207 dst_ip: Ipv4Addr,
208 sport: u16,
209 dport: u16,
210 flags: &str,
211 ) -> Packet {
212 let mut builder = TcpBuilder::new()
213 .src_port(sport)
214 .dst_port(dport)
215 .seq(1000)
216 .ack_num(0)
217 .window(65535);
218
219 for c in flags.chars() {
220 builder = match c {
221 'S' => builder.syn(),
222 'A' => builder.ack(),
223 'F' => builder.fin(),
224 'R' => builder.rst(),
225 _ => builder,
226 };
227 }
228
229 LayerStack::new()
230 .push(LayerStackEntry::Ethernet(
231 EthernetBuilder::new()
232 .dst(MacAddress::BROADCAST)
233 .src(MacAddress::new([0, 1, 2, 3, 4, 5])),
234 ))
235 .push(LayerStackEntry::Ipv4(
236 Ipv4Builder::new().src(src_ip).dst(dst_ip),
237 ))
238 .push(LayerStackEntry::Tcp(builder))
239 .build_packet()
240 }
241
242 fn udp_packet(src_ip: Ipv4Addr, dst_ip: Ipv4Addr, sport: u16, dport: u16) -> Packet {
243 LayerStack::new()
244 .push(LayerStackEntry::Ethernet(
245 EthernetBuilder::new()
246 .dst(MacAddress::BROADCAST)
247 .src(MacAddress::new([0, 1, 2, 3, 4, 5])),
248 ))
249 .push(LayerStackEntry::Ipv4(
250 Ipv4Builder::new().src(src_ip).dst(dst_ip),
251 ))
252 .push(LayerStackEntry::Udp(
253 UdpBuilder::new().src_port(sport).dst_port(dport),
254 ))
255 .build_packet()
256 }
257
258 #[test]
259 fn test_extract_flows_empty() {
260 let result = extract_flows(&[]).unwrap();
261 assert!(result.is_empty());
262 }
263
264 #[test]
265 fn test_extract_flows_single_tcp() {
266 let packets = vec![
267 make_captured(
268 tcp_packet(
269 Ipv4Addr::new(10, 0, 0, 1),
270 Ipv4Addr::new(10, 0, 0, 2),
271 12345,
272 80,
273 "S",
274 ),
275 1,
276 ),
277 make_captured(
278 tcp_packet(
279 Ipv4Addr::new(10, 0, 0, 2),
280 Ipv4Addr::new(10, 0, 0, 1),
281 80,
282 12345,
283 "SA",
284 ),
285 2,
286 ),
287 ];
288
289 let conversations = extract_flows(&packets).unwrap();
290 assert_eq!(conversations.len(), 1);
291 assert_eq!(conversations[0].total_packets(), 2);
292 assert_eq!(conversations[0].key.protocol, TransportProtocol::Tcp);
293 }
294
295 #[test]
296 fn test_extract_flows_multiple_conversations() {
297 let packets = vec![
298 make_captured(
299 tcp_packet(
300 Ipv4Addr::new(10, 0, 0, 1),
301 Ipv4Addr::new(10, 0, 0, 2),
302 12345,
303 80,
304 "S",
305 ),
306 1,
307 ),
308 make_captured(
309 udp_packet(
310 Ipv4Addr::new(10, 0, 0, 1),
311 Ipv4Addr::new(10, 0, 0, 3),
312 54321,
313 53,
314 ),
315 2,
316 ),
317 make_captured(
318 tcp_packet(
319 Ipv4Addr::new(10, 0, 0, 2),
320 Ipv4Addr::new(10, 0, 0, 1),
321 80,
322 12345,
323 "SA",
324 ),
325 3,
326 ),
327 ];
328
329 let conversations = extract_flows(&packets).unwrap();
330 assert_eq!(conversations.len(), 2);
331 assert!(conversations[0].start_time <= conversations[1].start_time);
333 }
334
335 #[test]
336 fn test_extract_flows_preserves_packet_indices() {
337 let packets = vec![
338 make_captured(
339 tcp_packet(
340 Ipv4Addr::new(10, 0, 0, 1),
341 Ipv4Addr::new(10, 0, 0, 2),
342 12345,
343 80,
344 "S",
345 ),
346 1,
347 ),
348 make_captured(
349 tcp_packet(
350 Ipv4Addr::new(10, 0, 0, 2),
351 Ipv4Addr::new(10, 0, 0, 1),
352 80,
353 12345,
354 "SA",
355 ),
356 2,
357 ),
358 ];
359
360 let conversations = extract_flows(&packets).unwrap();
361 assert_eq!(conversations[0].packet_indices, vec![0, 1]);
362 }
363}