Skip to main content

stackforge_core/flow/
mod.rs

1//! Stateful conversation extraction and flow tracking.
2//!
3//! This module provides Wireshark-inspired bidirectional conversation tracking
4//! with TCP state machine analysis, stream reassembly, and UDP timeout-based
5//! pseudo-conversation tracking.
6//!
7//! # Architecture
8//!
9//! - **Canonical Key**: Bidirectional 5-tuple with deterministic IP/port ordering
10//! - **Conversation Table**: DashMap-backed concurrent hash table
11//! - **TCP State Machine**: RFC 793 connection states with per-endpoint sequence tracking
12//! - **TCP Reassembly**: BTreeMap-based out-of-order segment management
13//! - **UDP Tracking**: Timeout-based pseudo-conversations
14//!
15//! # Usage
16//!
17//! ```rust,no_run
18//! use stackforge_core::flow::{extract_flows, FlowConfig};
19//! use stackforge_core::pcap::rdpcap;
20//!
21//! let packets = rdpcap("capture.pcap").unwrap();
22//! let conversations = extract_flows(&packets).unwrap();
23//! for conv in &conversations {
24//!     println!("{}: {} packets", conv.key, conv.total_packets());
25//! }
26//! ```
27
28pub mod config;
29pub mod error;
30pub mod icmp_state;
31pub mod key;
32pub mod spill;
33pub mod state;
34pub mod table;
35pub mod tcp_reassembly;
36pub mod tcp_state;
37pub mod udp_state;
38
39// Re-exports
40pub use config::FlowConfig;
41pub use error::FlowError;
42pub use icmp_state::IcmpFlowState;
43pub use key::{
44    CanonicalKey, FlowDirection, TransportProtocol, ZWaveKey, extract_key, extract_zwave_key,
45};
46pub use state::{
47    ConversationState, ConversationStatus, DirectionStats, ProtocolState, ZWaveFlowState,
48};
49pub use table::ConversationTable;
50pub use tcp_reassembly::{ReassemblyAction, TcpReassembler};
51pub use tcp_state::{TcpConnectionState, TcpConversationState, TcpEndpointState};
52pub use udp_state::UdpFlowState;
53
54use std::collections::HashMap;
55use std::path::Path;
56use std::time::Instant;
57
58use crate::error::PacketError;
59use crate::layer::LayerKind;
60use crate::pcap::{CaptureIterator, CapturedPacket};
61
62/// Format a byte count into a human-readable string.
63fn format_bytes(bytes: usize) -> String {
64    const KB: usize = 1024;
65    const MB: usize = 1024 * KB;
66    const GB: usize = 1024 * MB;
67    if bytes >= GB {
68        format!("{:.2} GB", bytes as f64 / GB as f64)
69    } else if bytes >= MB {
70        format!("{:.1} MB", bytes as f64 / MB as f64)
71    } else if bytes >= KB {
72        format!("{:.1} KB", bytes as f64 / KB as f64)
73    } else {
74        format!("{bytes} B")
75    }
76}
77
78/// Format a count with commas (e.g. 1,234,567).
79fn format_count(n: usize) -> String {
80    let s = n.to_string();
81    let mut result = String::with_capacity(s.len() + s.len() / 3);
82    for (i, c) in s.chars().enumerate() {
83        if i > 0 && (s.len() - i) % 3 == 0 {
84            result.push(',');
85        }
86        result.push(c);
87    }
88    result
89}
90
91/// Format duration as human-readable.
92fn format_duration(secs: f64) -> String {
93    if secs >= 3600.0 {
94        let h = (secs / 3600.0).floor();
95        let m = ((secs % 3600.0) / 60.0).floor();
96        format!("{h:.0}h {m:.0}m")
97    } else if secs >= 60.0 {
98        let m = (secs / 60.0).floor();
99        let s = secs % 60.0;
100        format!("{m:.0}m {s:.0}s")
101    } else {
102        format!("{secs:.1}s")
103    }
104}
105
106/// Extract bidirectional conversations from a list of captured packets.
107///
108/// This is the primary entry point for flow extraction. It processes all
109/// packets sequentially, groups them into bidirectional conversations using
110/// canonical key normalization, tracks TCP connection state and performs
111/// stream reassembly, and tracks UDP pseudo-conversations via timeouts.
112///
113/// Returns conversations sorted by start time.
114pub fn extract_flows(packets: &[CapturedPacket]) -> Result<Vec<ConversationState>, FlowError> {
115    extract_flows_with_config(packets, FlowConfig::default())
116}
117
118/// Extract flows with custom configuration.
119pub fn extract_flows_with_config(
120    packets: &[CapturedPacket],
121    config: FlowConfig,
122) -> Result<Vec<ConversationState>, FlowError> {
123    let verbose = config.verbose;
124    let interval = config.progress_interval.max(1);
125    let total = packets.len();
126    let table = ConversationTable::new(config);
127
128    let wall_start = Instant::now();
129
130    if verbose {
131        eprintln!();
132        eprintln!("[+] stackforge flow extraction engine");
133        eprintln!("[+] Input: {} packets (in-memory)", format_count(total));
134        eprintln!("[+] Processing...");
135        eprintln!();
136    }
137
138    for (index, captured) in packets.iter().enumerate() {
139        let timestamp = captured.metadata.timestamp;
140        table.ingest_packet(&captured.packet, timestamp, index)?;
141
142        if verbose && (index + 1) % interval == 0 {
143            let elapsed = wall_start.elapsed().as_secs_f64();
144            let rate = (index + 1) as f64 / elapsed;
145            let pct = (index + 1) as f64 / total as f64 * 100.0;
146            let remaining = (total - index - 1) as f64 / rate;
147            let mem = table.memory_usage();
148            eprintln!(
149                "    [{:5.1}%] {} pkts | {} flows | {}/s | mem ~{} | ETA {}",
150                pct,
151                format_count(index + 1),
152                format_count(table.conversation_count()),
153                format_count(rate as usize),
154                format_bytes(mem),
155                format_duration(remaining),
156            );
157        }
158    }
159
160    if verbose {
161        eprintln!();
162    }
163    let conversations = table.into_conversations();
164    if verbose {
165        let elapsed = wall_start.elapsed().as_secs_f64();
166        let rate = total as f64 / elapsed;
167        eprintln!(
168            "[+] Complete: {} packets -> {} flows",
169            format_count(total),
170            format_count(conversations.len())
171        );
172        eprintln!(
173            "[+] Wall time: {} ({}/s avg)",
174            format_duration(elapsed),
175            format_count(rate as usize)
176        );
177
178        let (total_drops, flows_with_drops) = count_dropped_segments(&conversations);
179        if total_drops > 0 {
180            eprintln!(
181                "[!] Warning: {} TCP segments dropped across {} flows (buffer/fragment limits exceeded)",
182                format_count(total_drops as usize),
183                format_count(flows_with_drops),
184            );
185            eprintln!(
186                "[!] Tip: increase max_reassembly_buffer or max_ooo_fragments to capture more data"
187            );
188        }
189        eprintln!();
190    }
191    Ok(conversations)
192}
193
194/// Extract flows from a streaming packet source (iterator).
195///
196/// Does not require all packets in memory simultaneously — each packet is
197/// processed and then dropped. Only conversation state (metadata + reassembly
198/// buffers) is retained.
199///
200/// If `config.memory_budget` is set, reassembly buffers will be spilled to
201/// disk when the budget is exceeded.
202pub fn extract_flows_streaming<I>(
203    packets: I,
204    config: FlowConfig,
205) -> Result<Vec<ConversationState>, FlowError>
206where
207    I: Iterator<Item = Result<CapturedPacket, PacketError>>,
208{
209    let verbose = config.verbose;
210    let interval = config.progress_interval.max(1);
211    let has_budget = config.memory_budget.is_some();
212    let budget_str = config
213        .memory_budget
214        .map(|b| format_bytes(b))
215        .unwrap_or_else(|| "unlimited".to_string());
216    let table = ConversationTable::new(config);
217
218    let wall_start = Instant::now();
219
220    if verbose {
221        eprintln!();
222        eprintln!("[+] stackforge flow extraction engine");
223        eprintln!("[+] Mode: streaming (packets read from disk on-the-fly)");
224        if has_budget {
225            eprintln!("[+] Memory budget: {budget_str}");
226        }
227        eprintln!("[+] Processing...");
228        eprintln!();
229    }
230
231    let mut last_report = Instant::now();
232
233    for (index, result) in packets.enumerate() {
234        let captured = result.map_err(FlowError::PacketError)?;
235        let timestamp = captured.metadata.timestamp;
236        table.ingest_packet(&captured.packet, timestamp, index)?;
237        // `captured` is dropped here — packet memory freed immediately
238
239        if verbose && (index + 1) % interval == 0 {
240            let now = Instant::now();
241            let elapsed = wall_start.elapsed().as_secs_f64();
242            let delta = now.duration_since(last_report).as_secs_f64();
243            let overall_rate = (index + 1) as f64 / elapsed;
244            let interval_rate = interval as f64 / delta;
245            let mem = table.memory_usage();
246            let spill_note = if has_budget && table.spill_count() > 0 {
247                format!(" | {} spills", format_count(table.spill_count()))
248            } else {
249                String::new()
250            };
251            eprintln!(
252                "    [{}] {} pkts | {} flows | {}/s (avg {}/s) | mem ~{}{}",
253                format_duration(elapsed),
254                format_count(index + 1),
255                format_count(table.conversation_count()),
256                format_count(interval_rate as usize),
257                format_count(overall_rate as usize),
258                format_bytes(mem),
259                spill_note,
260            );
261            last_report = now;
262        }
263    }
264
265    if verbose {
266        eprintln!();
267        eprintln!(
268            "[+] Finalizing (sorting {} flows)...",
269            format_count(table.conversation_count())
270        );
271    }
272    let conversations = table.into_conversations();
273    if verbose {
274        let elapsed = wall_start.elapsed().as_secs_f64();
275        eprintln!(
276            "[+] Complete: {} flows extracted",
277            format_count(conversations.len())
278        );
279        eprintln!("[+] Wall time: {}", format_duration(elapsed));
280
281        // Report reassembly drops
282        let (total_drops, flows_with_drops) = count_dropped_segments(&conversations);
283        if total_drops > 0 {
284            eprintln!(
285                "[!] Warning: {} TCP segments dropped across {} flows (buffer/fragment limits exceeded)",
286                format_count(total_drops as usize),
287                format_count(flows_with_drops),
288            );
289            eprintln!(
290                "[!] Tip: increase max_reassembly_buffer or max_ooo_fragments to capture more data"
291            );
292        }
293        eprintln!();
294    }
295    Ok(conversations)
296}
297
298/// Count total dropped TCP segments across all conversations.
299fn count_dropped_segments(conversations: &[ConversationState]) -> (u64, usize) {
300    let mut total_drops: u64 = 0;
301    let mut flows_with_drops: usize = 0;
302    for conv in conversations {
303        if let ProtocolState::Tcp(ref tcp) = conv.protocol_state {
304            let drops = tcp.total_dropped_segments();
305            if drops > 0 {
306                total_drops += drops;
307                flows_with_drops += 1;
308            }
309        }
310    }
311    (total_drops, flows_with_drops)
312}
313
314/// Extract flows directly from a capture file (PCAP or PcapNG).
315///
316/// Streams packets from disk — never loads the entire file into memory.
317/// The file format is auto-detected from magic bytes.
318pub fn extract_flows_from_file(
319    path: impl AsRef<Path>,
320    config: FlowConfig,
321) -> Result<Vec<ConversationState>, FlowError> {
322    let verbose = config.verbose;
323    let file_path = path.as_ref();
324    if verbose {
325        let file_size = std::fs::metadata(file_path)
326            .map(|m| format_bytes(m.len() as usize))
327            .unwrap_or_else(|_| "unknown".to_string());
328        eprintln!("[+] File: {} ({})", file_path.display(), file_size);
329    }
330    let iter = CaptureIterator::open(file_path).map_err(FlowError::PacketError)?;
331    extract_flows_streaming(iter, config)
332}
333
334/// Extract flows with anonymization applied to the output.
335///
336/// Combines [`extract_flows_with_config`] with an
337/// [`AnonymizationEngine`](crate::anonymize::AnonymizationEngine) pass
338/// over the resulting conversations.
339#[cfg(feature = "anonymize")]
340pub fn extract_flows_anonymized(
341    packets: &[CapturedPacket],
342    config: FlowConfig,
343    anon_policy: crate::anonymize::AnonymizationPolicy,
344) -> Result<Vec<ConversationState>, FlowError> {
345    let mut conversations = extract_flows_with_config(packets, config)?;
346    let mut engine = crate::anonymize::AnonymizationEngine::new(anon_policy);
347    engine.anonymize_conversations(&mut conversations);
348    Ok(conversations)
349}
350
351/// Extract flows from a file with anonymization applied to the output.
352#[cfg(feature = "anonymize")]
353pub fn extract_flows_from_file_anonymized(
354    path: impl AsRef<Path>,
355    config: FlowConfig,
356    anon_policy: crate::anonymize::AnonymizationPolicy,
357) -> Result<Vec<ConversationState>, FlowError> {
358    let mut conversations = extract_flows_from_file(path, config)?;
359    let mut engine = crate::anonymize::AnonymizationEngine::new(anon_policy);
360    engine.anonymize_conversations(&mut conversations);
361    Ok(conversations)
362}
363
364/// Extract Z-Wave conversations from a list of captured packets.
365///
366/// Z-Wave is a wireless protocol not carried over IP, so it needs its own
367/// flow extraction separate from the IP-based `extract_flows()`. Packets
368/// are grouped by home ID and canonical node pair (smaller node = `node_a`).
369///
370/// Non-Z-Wave packets are silently skipped.
371pub fn extract_zwave_flows(
372    packets: &[CapturedPacket],
373) -> Result<Vec<ConversationState>, FlowError> {
374    let mut conversations: HashMap<ZWaveKey, ConversationState> = HashMap::new();
375
376    for (index, captured) in packets.iter().enumerate() {
377        let timestamp = captured.metadata.timestamp;
378        let packet = &captured.packet;
379
380        // Skip packets without a Z-Wave layer
381        if packet.get_layer(LayerKind::ZWave).is_none() {
382            continue;
383        }
384
385        let (key, direction) = match extract_zwave_key(packet) {
386            Ok(result) => result,
387            Err(_) => continue,
388        };
389
390        let byte_count = packet.as_bytes().len() as u64;
391        let buf = packet.as_bytes();
392
393        let conv = conversations.entry(key.clone()).or_insert_with(|| {
394            let mut state = ConversationState::new_zwave(key, timestamp);
395            if let ProtocolState::ZWave(ref mut zw) = state.protocol_state
396                && let Some(zwave) = packet.zwave()
397            {
398                zw.home_id = zwave.home_id(buf).unwrap_or(0);
399            }
400            state
401        });
402
403        conv.record_packet(direction, byte_count, timestamp, index, false, false, true);
404
405        // Track ACK vs command frames
406        if let ProtocolState::ZWave(ref mut zw) = conv.protocol_state
407            && let Some(zwave) = packet.zwave()
408        {
409            if zwave.is_ack(buf) {
410                zw.ack_count += 1;
411            } else {
412                zw.command_count += 1;
413            }
414        }
415    }
416
417    let mut result: Vec<ConversationState> = conversations.into_values().collect();
418    result.sort_by_key(|c| c.start_time);
419    Ok(result)
420}
421
422#[cfg(test)]
423mod tests {
424    use super::*;
425    use crate::layer::stack::{LayerStack, LayerStackEntry};
426    use crate::pcap::PcapMetadata;
427    use crate::{EthernetBuilder, Ipv4Builder, MacAddress, Packet, TcpBuilder, UdpBuilder};
428    use std::net::Ipv4Addr;
429    use std::time::Duration;
430
431    fn make_captured(packet: Packet, timestamp_secs: u64) -> CapturedPacket {
432        CapturedPacket {
433            packet,
434            metadata: PcapMetadata {
435                timestamp: Duration::from_secs(timestamp_secs),
436                orig_len: 0,
437                ..Default::default()
438            },
439        }
440    }
441
442    fn tcp_packet(
443        src_ip: Ipv4Addr,
444        dst_ip: Ipv4Addr,
445        sport: u16,
446        dport: u16,
447        flags: &str,
448    ) -> Packet {
449        let mut builder = TcpBuilder::new()
450            .src_port(sport)
451            .dst_port(dport)
452            .seq(1000)
453            .ack_num(0)
454            .window(65535);
455
456        for c in flags.chars() {
457            builder = match c {
458                'S' => builder.syn(),
459                'A' => builder.ack(),
460                'F' => builder.fin(),
461                'R' => builder.rst(),
462                _ => builder,
463            };
464        }
465
466        LayerStack::new()
467            .push(LayerStackEntry::Ethernet(
468                EthernetBuilder::new()
469                    .dst(MacAddress::BROADCAST)
470                    .src(MacAddress::new([0, 1, 2, 3, 4, 5])),
471            ))
472            .push(LayerStackEntry::Ipv4(
473                Ipv4Builder::new().src(src_ip).dst(dst_ip),
474            ))
475            .push(LayerStackEntry::Tcp(builder))
476            .build_packet()
477    }
478
479    fn udp_packet(src_ip: Ipv4Addr, dst_ip: Ipv4Addr, sport: u16, dport: u16) -> Packet {
480        LayerStack::new()
481            .push(LayerStackEntry::Ethernet(
482                EthernetBuilder::new()
483                    .dst(MacAddress::BROADCAST)
484                    .src(MacAddress::new([0, 1, 2, 3, 4, 5])),
485            ))
486            .push(LayerStackEntry::Ipv4(
487                Ipv4Builder::new().src(src_ip).dst(dst_ip),
488            ))
489            .push(LayerStackEntry::Udp(
490                UdpBuilder::new().src_port(sport).dst_port(dport),
491            ))
492            .build_packet()
493    }
494
495    #[test]
496    fn test_extract_flows_empty() {
497        let result = extract_flows(&[]).unwrap();
498        assert!(result.is_empty());
499    }
500
501    #[test]
502    fn test_extract_flows_single_tcp() {
503        let packets = vec![
504            make_captured(
505                tcp_packet(
506                    Ipv4Addr::new(10, 0, 0, 1),
507                    Ipv4Addr::new(10, 0, 0, 2),
508                    12345,
509                    80,
510                    "S",
511                ),
512                1,
513            ),
514            make_captured(
515                tcp_packet(
516                    Ipv4Addr::new(10, 0, 0, 2),
517                    Ipv4Addr::new(10, 0, 0, 1),
518                    80,
519                    12345,
520                    "SA",
521                ),
522                2,
523            ),
524        ];
525
526        let conversations = extract_flows(&packets).unwrap();
527        assert_eq!(conversations.len(), 1);
528        assert_eq!(conversations[0].total_packets(), 2);
529        assert_eq!(conversations[0].key.protocol, TransportProtocol::Tcp);
530    }
531
532    #[test]
533    fn test_extract_flows_multiple_conversations() {
534        let packets = vec![
535            make_captured(
536                tcp_packet(
537                    Ipv4Addr::new(10, 0, 0, 1),
538                    Ipv4Addr::new(10, 0, 0, 2),
539                    12345,
540                    80,
541                    "S",
542                ),
543                1,
544            ),
545            make_captured(
546                udp_packet(
547                    Ipv4Addr::new(10, 0, 0, 1),
548                    Ipv4Addr::new(10, 0, 0, 3),
549                    54321,
550                    53,
551                ),
552                2,
553            ),
554            make_captured(
555                tcp_packet(
556                    Ipv4Addr::new(10, 0, 0, 2),
557                    Ipv4Addr::new(10, 0, 0, 1),
558                    80,
559                    12345,
560                    "SA",
561                ),
562                3,
563            ),
564        ];
565
566        let conversations = extract_flows(&packets).unwrap();
567        assert_eq!(conversations.len(), 2);
568        // Sorted by start time
569        assert!(conversations[0].start_time <= conversations[1].start_time);
570    }
571
572    #[test]
573    fn test_extract_flows_preserves_packet_indices() {
574        let packets = vec![
575            make_captured(
576                tcp_packet(
577                    Ipv4Addr::new(10, 0, 0, 1),
578                    Ipv4Addr::new(10, 0, 0, 2),
579                    12345,
580                    80,
581                    "S",
582                ),
583                1,
584            ),
585            make_captured(
586                tcp_packet(
587                    Ipv4Addr::new(10, 0, 0, 2),
588                    Ipv4Addr::new(10, 0, 0, 1),
589                    80,
590                    12345,
591                    "SA",
592                ),
593                2,
594            ),
595        ];
596
597        let conversations = extract_flows(&packets).unwrap();
598        assert_eq!(conversations[0].packet_indices, vec![0, 1]);
599    }
600}