1pub mod config;
29pub mod error;
30pub mod icmp_state;
31pub mod key;
32pub mod spill;
33pub mod state;
34pub mod table;
35pub mod tcp_reassembly;
36pub mod tcp_state;
37pub mod udp_state;
38
39pub use config::FlowConfig;
41pub use error::FlowError;
42pub use icmp_state::IcmpFlowState;
43pub use key::{
44 CanonicalKey, FlowDirection, TransportProtocol, ZWaveKey, extract_key, extract_zwave_key,
45};
46pub use state::{
47 ConversationState, ConversationStatus, DirectionStats, ProtocolState, ZWaveFlowState,
48};
49pub use table::ConversationTable;
50pub use tcp_reassembly::{ReassemblyAction, TcpReassembler};
51pub use tcp_state::{TcpConnectionState, TcpConversationState, TcpEndpointState};
52pub use udp_state::UdpFlowState;
53
54use std::collections::HashMap;
55use std::path::Path;
56
57use crate::error::PacketError;
58use crate::layer::LayerKind;
59use crate::pcap::{CaptureIterator, CapturedPacket};
60
61pub fn extract_flows(packets: &[CapturedPacket]) -> Result<Vec<ConversationState>, FlowError> {
70 extract_flows_with_config(packets, FlowConfig::default())
71}
72
73pub fn extract_flows_with_config(
75 packets: &[CapturedPacket],
76 config: FlowConfig,
77) -> Result<Vec<ConversationState>, FlowError> {
78 let verbose = config.verbose;
79 let total = packets.len();
80 let table = ConversationTable::new(config);
81
82 if verbose {
83 eprintln!("[stackforge] Starting flow extraction ({total} packets)...");
84 }
85
86 for (index, captured) in packets.iter().enumerate() {
87 let timestamp = captured.metadata.timestamp;
88 table.ingest_packet(&captured.packet, timestamp, index)?;
89
90 if verbose && (index + 1) % 10_000 == 0 {
91 eprintln!(
92 "[stackforge] Processed {}/{total} packets ({} flows so far)",
93 index + 1,
94 table.conversation_count(),
95 );
96 }
97 }
98
99 let conversations = table.into_conversations();
100 if verbose {
101 eprintln!(
102 "[stackforge] Flow extraction complete: {total} packets -> {} conversations",
103 conversations.len(),
104 );
105 }
106 Ok(conversations)
107}
108
109pub fn extract_flows_streaming<I>(
118 packets: I,
119 config: FlowConfig,
120) -> Result<Vec<ConversationState>, FlowError>
121where
122 I: Iterator<Item = Result<CapturedPacket, PacketError>>,
123{
124 let verbose = config.verbose;
125 let table = ConversationTable::new(config);
126
127 if verbose {
128 eprintln!("[stackforge] Starting streaming flow extraction...");
129 }
130
131 for (index, result) in packets.enumerate() {
132 let captured = result.map_err(FlowError::PacketError)?;
133 let timestamp = captured.metadata.timestamp;
134 table.ingest_packet(&captured.packet, timestamp, index)?;
135 if verbose && (index + 1) % 10_000 == 0 {
138 eprintln!(
139 "[stackforge] Processed {} packets ({} flows so far)",
140 index + 1,
141 table.conversation_count(),
142 );
143 }
144 }
145
146 let conversations = table.into_conversations();
147 if verbose {
148 eprintln!(
149 "[stackforge] Flow extraction complete: {} conversations",
150 conversations.len(),
151 );
152 }
153 Ok(conversations)
154}
155
156pub fn extract_flows_from_file(
161 path: impl AsRef<Path>,
162 config: FlowConfig,
163) -> Result<Vec<ConversationState>, FlowError> {
164 let verbose = config.verbose;
165 if verbose {
166 eprintln!(
167 "[stackforge] Opening capture file: {}",
168 path.as_ref().display(),
169 );
170 }
171 let iter = CaptureIterator::open(path).map_err(FlowError::PacketError)?;
172 extract_flows_streaming(iter, config)
173}
174
175pub fn extract_zwave_flows(
183 packets: &[CapturedPacket],
184) -> Result<Vec<ConversationState>, FlowError> {
185 let mut conversations: HashMap<ZWaveKey, ConversationState> = HashMap::new();
186
187 for (index, captured) in packets.iter().enumerate() {
188 let timestamp = captured.metadata.timestamp;
189 let packet = &captured.packet;
190
191 if packet.get_layer(LayerKind::ZWave).is_none() {
193 continue;
194 }
195
196 let (key, direction) = match extract_zwave_key(packet) {
197 Ok(result) => result,
198 Err(_) => continue,
199 };
200
201 let byte_count = packet.as_bytes().len() as u64;
202 let buf = packet.as_bytes();
203
204 let conv = conversations.entry(key.clone()).or_insert_with(|| {
205 let mut state = ConversationState::new_zwave(key, timestamp);
206 if let ProtocolState::ZWave(ref mut zw) = state.protocol_state
207 && let Some(zwave) = packet.zwave()
208 {
209 zw.home_id = zwave.home_id(buf).unwrap_or(0);
210 }
211 state
212 });
213
214 conv.record_packet(direction, byte_count, timestamp, index, false, false);
215
216 if let ProtocolState::ZWave(ref mut zw) = conv.protocol_state
218 && let Some(zwave) = packet.zwave()
219 {
220 if zwave.is_ack(buf) {
221 zw.ack_count += 1;
222 } else {
223 zw.command_count += 1;
224 }
225 }
226 }
227
228 let mut result: Vec<ConversationState> = conversations.into_values().collect();
229 result.sort_by_key(|c| c.start_time);
230 Ok(result)
231}
232
233#[cfg(test)]
234mod tests {
235 use super::*;
236 use crate::layer::stack::{LayerStack, LayerStackEntry};
237 use crate::pcap::PcapMetadata;
238 use crate::{EthernetBuilder, Ipv4Builder, MacAddress, Packet, TcpBuilder, UdpBuilder};
239 use std::net::Ipv4Addr;
240 use std::time::Duration;
241
242 fn make_captured(packet: Packet, timestamp_secs: u64) -> CapturedPacket {
243 CapturedPacket {
244 packet,
245 metadata: PcapMetadata {
246 timestamp: Duration::from_secs(timestamp_secs),
247 orig_len: 0,
248 ..Default::default()
249 },
250 }
251 }
252
253 fn tcp_packet(
254 src_ip: Ipv4Addr,
255 dst_ip: Ipv4Addr,
256 sport: u16,
257 dport: u16,
258 flags: &str,
259 ) -> Packet {
260 let mut builder = TcpBuilder::new()
261 .src_port(sport)
262 .dst_port(dport)
263 .seq(1000)
264 .ack_num(0)
265 .window(65535);
266
267 for c in flags.chars() {
268 builder = match c {
269 'S' => builder.syn(),
270 'A' => builder.ack(),
271 'F' => builder.fin(),
272 'R' => builder.rst(),
273 _ => builder,
274 };
275 }
276
277 LayerStack::new()
278 .push(LayerStackEntry::Ethernet(
279 EthernetBuilder::new()
280 .dst(MacAddress::BROADCAST)
281 .src(MacAddress::new([0, 1, 2, 3, 4, 5])),
282 ))
283 .push(LayerStackEntry::Ipv4(
284 Ipv4Builder::new().src(src_ip).dst(dst_ip),
285 ))
286 .push(LayerStackEntry::Tcp(builder))
287 .build_packet()
288 }
289
290 fn udp_packet(src_ip: Ipv4Addr, dst_ip: Ipv4Addr, sport: u16, dport: u16) -> Packet {
291 LayerStack::new()
292 .push(LayerStackEntry::Ethernet(
293 EthernetBuilder::new()
294 .dst(MacAddress::BROADCAST)
295 .src(MacAddress::new([0, 1, 2, 3, 4, 5])),
296 ))
297 .push(LayerStackEntry::Ipv4(
298 Ipv4Builder::new().src(src_ip).dst(dst_ip),
299 ))
300 .push(LayerStackEntry::Udp(
301 UdpBuilder::new().src_port(sport).dst_port(dport),
302 ))
303 .build_packet()
304 }
305
306 #[test]
307 fn test_extract_flows_empty() {
308 let result = extract_flows(&[]).unwrap();
309 assert!(result.is_empty());
310 }
311
312 #[test]
313 fn test_extract_flows_single_tcp() {
314 let packets = vec![
315 make_captured(
316 tcp_packet(
317 Ipv4Addr::new(10, 0, 0, 1),
318 Ipv4Addr::new(10, 0, 0, 2),
319 12345,
320 80,
321 "S",
322 ),
323 1,
324 ),
325 make_captured(
326 tcp_packet(
327 Ipv4Addr::new(10, 0, 0, 2),
328 Ipv4Addr::new(10, 0, 0, 1),
329 80,
330 12345,
331 "SA",
332 ),
333 2,
334 ),
335 ];
336
337 let conversations = extract_flows(&packets).unwrap();
338 assert_eq!(conversations.len(), 1);
339 assert_eq!(conversations[0].total_packets(), 2);
340 assert_eq!(conversations[0].key.protocol, TransportProtocol::Tcp);
341 }
342
343 #[test]
344 fn test_extract_flows_multiple_conversations() {
345 let packets = vec![
346 make_captured(
347 tcp_packet(
348 Ipv4Addr::new(10, 0, 0, 1),
349 Ipv4Addr::new(10, 0, 0, 2),
350 12345,
351 80,
352 "S",
353 ),
354 1,
355 ),
356 make_captured(
357 udp_packet(
358 Ipv4Addr::new(10, 0, 0, 1),
359 Ipv4Addr::new(10, 0, 0, 3),
360 54321,
361 53,
362 ),
363 2,
364 ),
365 make_captured(
366 tcp_packet(
367 Ipv4Addr::new(10, 0, 0, 2),
368 Ipv4Addr::new(10, 0, 0, 1),
369 80,
370 12345,
371 "SA",
372 ),
373 3,
374 ),
375 ];
376
377 let conversations = extract_flows(&packets).unwrap();
378 assert_eq!(conversations.len(), 2);
379 assert!(conversations[0].start_time <= conversations[1].start_time);
381 }
382
383 #[test]
384 fn test_extract_flows_preserves_packet_indices() {
385 let packets = vec![
386 make_captured(
387 tcp_packet(
388 Ipv4Addr::new(10, 0, 0, 1),
389 Ipv4Addr::new(10, 0, 0, 2),
390 12345,
391 80,
392 "S",
393 ),
394 1,
395 ),
396 make_captured(
397 tcp_packet(
398 Ipv4Addr::new(10, 0, 0, 2),
399 Ipv4Addr::new(10, 0, 0, 1),
400 80,
401 12345,
402 "SA",
403 ),
404 2,
405 ),
406 ];
407
408 let conversations = extract_flows(&packets).unwrap();
409 assert_eq!(conversations[0].packet_indices, vec![0, 1]);
410 }
411}