1pub mod config;
29pub mod error;
30pub mod icmp_state;
31pub mod key;
32pub mod spill;
33pub mod state;
34pub mod table;
35pub mod tcp_reassembly;
36pub mod tcp_state;
37pub mod udp_state;
38
39pub use config::FlowConfig;
41pub use error::FlowError;
42pub use icmp_state::IcmpFlowState;
43pub use key::{
44 CanonicalKey, FlowDirection, TransportProtocol, ZWaveKey, extract_key, extract_zwave_key,
45};
46pub use state::{
47 ConversationState, ConversationStatus, DirectionStats, ProtocolState, ZWaveFlowState,
48};
49pub use table::ConversationTable;
50pub use tcp_reassembly::{ReassemblyAction, TcpReassembler};
51pub use tcp_state::{TcpConnectionState, TcpConversationState, TcpEndpointState};
52pub use udp_state::UdpFlowState;
53
54use std::collections::HashMap;
55use std::path::Path;
56use std::time::Instant;
57
58use crate::error::PacketError;
59use crate::layer::LayerKind;
60use crate::pcap::{CaptureIterator, CapturedPacket};
61
62fn format_bytes(bytes: usize) -> String {
64 const KB: usize = 1024;
65 const MB: usize = 1024 * KB;
66 const GB: usize = 1024 * MB;
67 if bytes >= GB {
68 format!("{:.2} GB", bytes as f64 / GB as f64)
69 } else if bytes >= MB {
70 format!("{:.1} MB", bytes as f64 / MB as f64)
71 } else if bytes >= KB {
72 format!("{:.1} KB", bytes as f64 / KB as f64)
73 } else {
74 format!("{bytes} B")
75 }
76}
77
78fn format_count(n: usize) -> String {
80 let s = n.to_string();
81 let mut result = String::with_capacity(s.len() + s.len() / 3);
82 for (i, c) in s.chars().enumerate() {
83 if i > 0 && (s.len() - i) % 3 == 0 {
84 result.push(',');
85 }
86 result.push(c);
87 }
88 result
89}
90
91fn format_duration(secs: f64) -> String {
93 if secs >= 3600.0 {
94 let h = (secs / 3600.0).floor();
95 let m = ((secs % 3600.0) / 60.0).floor();
96 format!("{h:.0}h {m:.0}m")
97 } else if secs >= 60.0 {
98 let m = (secs / 60.0).floor();
99 let s = secs % 60.0;
100 format!("{m:.0}m {s:.0}s")
101 } else {
102 format!("{secs:.1}s")
103 }
104}
105
106pub fn extract_flows(packets: &[CapturedPacket]) -> Result<Vec<ConversationState>, FlowError> {
115 extract_flows_with_config(packets, FlowConfig::default())
116}
117
118pub fn extract_flows_with_config(
120 packets: &[CapturedPacket],
121 config: FlowConfig,
122) -> Result<Vec<ConversationState>, FlowError> {
123 let verbose = config.verbose;
124 let interval = config.progress_interval.max(1);
125 let total = packets.len();
126 let table = ConversationTable::new(config);
127
128 let wall_start = Instant::now();
129
130 if verbose {
131 eprintln!();
132 eprintln!("[+] stackforge flow extraction engine");
133 eprintln!("[+] Input: {} packets (in-memory)", format_count(total));
134 eprintln!("[+] Processing...");
135 eprintln!();
136 }
137
138 for (index, captured) in packets.iter().enumerate() {
139 let timestamp = captured.metadata.timestamp;
140 table.ingest_packet(&captured.packet, timestamp, index)?;
141
142 if verbose && (index + 1) % interval == 0 {
143 let elapsed = wall_start.elapsed().as_secs_f64();
144 let rate = (index + 1) as f64 / elapsed;
145 let pct = (index + 1) as f64 / total as f64 * 100.0;
146 let remaining = (total - index - 1) as f64 / rate;
147 let mem = table.memory_usage();
148 eprintln!(
149 " [{:5.1}%] {} pkts | {} flows | {}/s | mem ~{} | ETA {}",
150 pct,
151 format_count(index + 1),
152 format_count(table.conversation_count()),
153 format_count(rate as usize),
154 format_bytes(mem),
155 format_duration(remaining),
156 );
157 }
158 }
159
160 if verbose {
161 eprintln!();
162 }
163 let conversations = table.into_conversations();
164 if verbose {
165 let elapsed = wall_start.elapsed().as_secs_f64();
166 let rate = total as f64 / elapsed;
167 eprintln!(
168 "[+] Complete: {} packets -> {} flows",
169 format_count(total),
170 format_count(conversations.len())
171 );
172 eprintln!(
173 "[+] Wall time: {} ({}/s avg)",
174 format_duration(elapsed),
175 format_count(rate as usize)
176 );
177
178 let (total_drops, flows_with_drops) = count_dropped_segments(&conversations);
179 if total_drops > 0 {
180 eprintln!(
181 "[!] Warning: {} TCP segments dropped across {} flows (buffer/fragment limits exceeded)",
182 format_count(total_drops as usize),
183 format_count(flows_with_drops),
184 );
185 eprintln!(
186 "[!] Tip: increase max_reassembly_buffer or max_ooo_fragments to capture more data"
187 );
188 }
189 eprintln!();
190 }
191 Ok(conversations)
192}
193
194pub fn extract_flows_streaming<I>(
203 packets: I,
204 config: FlowConfig,
205) -> Result<Vec<ConversationState>, FlowError>
206where
207 I: Iterator<Item = Result<CapturedPacket, PacketError>>,
208{
209 let verbose = config.verbose;
210 let interval = config.progress_interval.max(1);
211 let has_budget = config.memory_budget.is_some();
212 let budget_str = config
213 .memory_budget
214 .map(|b| format_bytes(b))
215 .unwrap_or_else(|| "unlimited".to_string());
216 let table = ConversationTable::new(config);
217
218 let wall_start = Instant::now();
219
220 if verbose {
221 eprintln!();
222 eprintln!("[+] stackforge flow extraction engine");
223 eprintln!("[+] Mode: streaming (packets read from disk on-the-fly)");
224 if has_budget {
225 eprintln!("[+] Memory budget: {budget_str}");
226 }
227 eprintln!("[+] Processing...");
228 eprintln!();
229 }
230
231 let mut last_report = Instant::now();
232
233 for (index, result) in packets.enumerate() {
234 let captured = result.map_err(FlowError::PacketError)?;
235 let timestamp = captured.metadata.timestamp;
236 table.ingest_packet(&captured.packet, timestamp, index)?;
237 if verbose && (index + 1) % interval == 0 {
240 let now = Instant::now();
241 let elapsed = wall_start.elapsed().as_secs_f64();
242 let delta = now.duration_since(last_report).as_secs_f64();
243 let overall_rate = (index + 1) as f64 / elapsed;
244 let interval_rate = interval as f64 / delta;
245 let mem = table.memory_usage();
246 let spill_note = if has_budget && table.spill_count() > 0 {
247 format!(" | {} spills", format_count(table.spill_count()))
248 } else {
249 String::new()
250 };
251 eprintln!(
252 " [{}] {} pkts | {} flows | {}/s (avg {}/s) | mem ~{}{}",
253 format_duration(elapsed),
254 format_count(index + 1),
255 format_count(table.conversation_count()),
256 format_count(interval_rate as usize),
257 format_count(overall_rate as usize),
258 format_bytes(mem),
259 spill_note,
260 );
261 last_report = now;
262 }
263 }
264
265 if verbose {
266 eprintln!();
267 eprintln!(
268 "[+] Finalizing (sorting {} flows)...",
269 format_count(table.conversation_count())
270 );
271 }
272 let conversations = table.into_conversations();
273 if verbose {
274 let elapsed = wall_start.elapsed().as_secs_f64();
275 eprintln!(
276 "[+] Complete: {} flows extracted",
277 format_count(conversations.len())
278 );
279 eprintln!("[+] Wall time: {}", format_duration(elapsed));
280
281 let (total_drops, flows_with_drops) = count_dropped_segments(&conversations);
283 if total_drops > 0 {
284 eprintln!(
285 "[!] Warning: {} TCP segments dropped across {} flows (buffer/fragment limits exceeded)",
286 format_count(total_drops as usize),
287 format_count(flows_with_drops),
288 );
289 eprintln!(
290 "[!] Tip: increase max_reassembly_buffer or max_ooo_fragments to capture more data"
291 );
292 }
293 eprintln!();
294 }
295 Ok(conversations)
296}
297
298fn count_dropped_segments(conversations: &[ConversationState]) -> (u64, usize) {
300 let mut total_drops: u64 = 0;
301 let mut flows_with_drops: usize = 0;
302 for conv in conversations {
303 if let ProtocolState::Tcp(ref tcp) = conv.protocol_state {
304 let drops = tcp.total_dropped_segments();
305 if drops > 0 {
306 total_drops += drops;
307 flows_with_drops += 1;
308 }
309 }
310 }
311 (total_drops, flows_with_drops)
312}
313
314pub fn extract_flows_from_file(
319 path: impl AsRef<Path>,
320 config: FlowConfig,
321) -> Result<Vec<ConversationState>, FlowError> {
322 let verbose = config.verbose;
323 let file_path = path.as_ref();
324 if verbose {
325 let file_size = std::fs::metadata(file_path)
326 .map(|m| format_bytes(m.len() as usize))
327 .unwrap_or_else(|_| "unknown".to_string());
328 eprintln!("[+] File: {} ({})", file_path.display(), file_size);
329 }
330 let iter = CaptureIterator::open(file_path).map_err(FlowError::PacketError)?;
331 extract_flows_streaming(iter, config)
332}
333
334#[cfg(feature = "anonymize")]
340pub fn extract_flows_anonymized(
341 packets: &[CapturedPacket],
342 config: FlowConfig,
343 anon_policy: crate::anonymize::AnonymizationPolicy,
344) -> Result<Vec<ConversationState>, FlowError> {
345 let mut conversations = extract_flows_with_config(packets, config)?;
346 let mut engine = crate::anonymize::AnonymizationEngine::new(anon_policy);
347 engine.anonymize_conversations(&mut conversations);
348 Ok(conversations)
349}
350
351#[cfg(feature = "anonymize")]
353pub fn extract_flows_from_file_anonymized(
354 path: impl AsRef<Path>,
355 config: FlowConfig,
356 anon_policy: crate::anonymize::AnonymizationPolicy,
357) -> Result<Vec<ConversationState>, FlowError> {
358 let mut conversations = extract_flows_from_file(path, config)?;
359 let mut engine = crate::anonymize::AnonymizationEngine::new(anon_policy);
360 engine.anonymize_conversations(&mut conversations);
361 Ok(conversations)
362}
363
364pub fn extract_zwave_flows(
372 packets: &[CapturedPacket],
373) -> Result<Vec<ConversationState>, FlowError> {
374 let mut conversations: HashMap<ZWaveKey, ConversationState> = HashMap::new();
375
376 for (index, captured) in packets.iter().enumerate() {
377 let timestamp = captured.metadata.timestamp;
378 let packet = &captured.packet;
379
380 if packet.get_layer(LayerKind::ZWave).is_none() {
382 continue;
383 }
384
385 let (key, direction) = match extract_zwave_key(packet) {
386 Ok(result) => result,
387 Err(_) => continue,
388 };
389
390 let byte_count = packet.as_bytes().len() as u64;
391 let buf = packet.as_bytes();
392
393 let conv = conversations.entry(key.clone()).or_insert_with(|| {
394 let mut state = ConversationState::new_zwave(key, timestamp);
395 if let ProtocolState::ZWave(ref mut zw) = state.protocol_state
396 && let Some(zwave) = packet.zwave()
397 {
398 zw.home_id = zwave.home_id(buf).unwrap_or(0);
399 }
400 state
401 });
402
403 conv.record_packet(direction, byte_count, timestamp, index, false, false, true);
404
405 if let ProtocolState::ZWave(ref mut zw) = conv.protocol_state
407 && let Some(zwave) = packet.zwave()
408 {
409 if zwave.is_ack(buf) {
410 zw.ack_count += 1;
411 } else {
412 zw.command_count += 1;
413 }
414 }
415 }
416
417 let mut result: Vec<ConversationState> = conversations.into_values().collect();
418 result.sort_by_key(|c| c.start_time);
419 Ok(result)
420}
421
422#[cfg(test)]
423mod tests {
424 use super::*;
425 use crate::layer::stack::{LayerStack, LayerStackEntry};
426 use crate::pcap::PcapMetadata;
427 use crate::{EthernetBuilder, Ipv4Builder, MacAddress, Packet, TcpBuilder, UdpBuilder};
428 use std::net::Ipv4Addr;
429 use std::time::Duration;
430
431 fn make_captured(packet: Packet, timestamp_secs: u64) -> CapturedPacket {
432 CapturedPacket {
433 packet,
434 metadata: PcapMetadata {
435 timestamp: Duration::from_secs(timestamp_secs),
436 orig_len: 0,
437 ..Default::default()
438 },
439 }
440 }
441
442 fn tcp_packet(
443 src_ip: Ipv4Addr,
444 dst_ip: Ipv4Addr,
445 sport: u16,
446 dport: u16,
447 flags: &str,
448 ) -> Packet {
449 let mut builder = TcpBuilder::new()
450 .src_port(sport)
451 .dst_port(dport)
452 .seq(1000)
453 .ack_num(0)
454 .window(65535);
455
456 for c in flags.chars() {
457 builder = match c {
458 'S' => builder.syn(),
459 'A' => builder.ack(),
460 'F' => builder.fin(),
461 'R' => builder.rst(),
462 _ => builder,
463 };
464 }
465
466 LayerStack::new()
467 .push(LayerStackEntry::Ethernet(
468 EthernetBuilder::new()
469 .dst(MacAddress::BROADCAST)
470 .src(MacAddress::new([0, 1, 2, 3, 4, 5])),
471 ))
472 .push(LayerStackEntry::Ipv4(
473 Ipv4Builder::new().src(src_ip).dst(dst_ip),
474 ))
475 .push(LayerStackEntry::Tcp(builder))
476 .build_packet()
477 }
478
479 fn udp_packet(src_ip: Ipv4Addr, dst_ip: Ipv4Addr, sport: u16, dport: u16) -> Packet {
480 LayerStack::new()
481 .push(LayerStackEntry::Ethernet(
482 EthernetBuilder::new()
483 .dst(MacAddress::BROADCAST)
484 .src(MacAddress::new([0, 1, 2, 3, 4, 5])),
485 ))
486 .push(LayerStackEntry::Ipv4(
487 Ipv4Builder::new().src(src_ip).dst(dst_ip),
488 ))
489 .push(LayerStackEntry::Udp(
490 UdpBuilder::new().src_port(sport).dst_port(dport),
491 ))
492 .build_packet()
493 }
494
495 #[test]
496 fn test_extract_flows_empty() {
497 let result = extract_flows(&[]).unwrap();
498 assert!(result.is_empty());
499 }
500
501 #[test]
502 fn test_extract_flows_single_tcp() {
503 let packets = vec![
504 make_captured(
505 tcp_packet(
506 Ipv4Addr::new(10, 0, 0, 1),
507 Ipv4Addr::new(10, 0, 0, 2),
508 12345,
509 80,
510 "S",
511 ),
512 1,
513 ),
514 make_captured(
515 tcp_packet(
516 Ipv4Addr::new(10, 0, 0, 2),
517 Ipv4Addr::new(10, 0, 0, 1),
518 80,
519 12345,
520 "SA",
521 ),
522 2,
523 ),
524 ];
525
526 let conversations = extract_flows(&packets).unwrap();
527 assert_eq!(conversations.len(), 1);
528 assert_eq!(conversations[0].total_packets(), 2);
529 assert_eq!(conversations[0].key.protocol, TransportProtocol::Tcp);
530 }
531
532 #[test]
533 fn test_extract_flows_multiple_conversations() {
534 let packets = vec![
535 make_captured(
536 tcp_packet(
537 Ipv4Addr::new(10, 0, 0, 1),
538 Ipv4Addr::new(10, 0, 0, 2),
539 12345,
540 80,
541 "S",
542 ),
543 1,
544 ),
545 make_captured(
546 udp_packet(
547 Ipv4Addr::new(10, 0, 0, 1),
548 Ipv4Addr::new(10, 0, 0, 3),
549 54321,
550 53,
551 ),
552 2,
553 ),
554 make_captured(
555 tcp_packet(
556 Ipv4Addr::new(10, 0, 0, 2),
557 Ipv4Addr::new(10, 0, 0, 1),
558 80,
559 12345,
560 "SA",
561 ),
562 3,
563 ),
564 ];
565
566 let conversations = extract_flows(&packets).unwrap();
567 assert_eq!(conversations.len(), 2);
568 assert!(conversations[0].start_time <= conversations[1].start_time);
570 }
571
572 #[test]
573 fn test_extract_flows_preserves_packet_indices() {
574 let packets = vec![
575 make_captured(
576 tcp_packet(
577 Ipv4Addr::new(10, 0, 0, 1),
578 Ipv4Addr::new(10, 0, 0, 2),
579 12345,
580 80,
581 "S",
582 ),
583 1,
584 ),
585 make_captured(
586 tcp_packet(
587 Ipv4Addr::new(10, 0, 0, 2),
588 Ipv4Addr::new(10, 0, 0, 1),
589 80,
590 12345,
591 "SA",
592 ),
593 2,
594 ),
595 ];
596
597 let conversations = extract_flows(&packets).unwrap();
598 assert_eq!(conversations[0].packet_indices, vec![0, 1]);
599 }
600}