Skip to main content

stackforge_core/flow/
mod.rs

1//! Stateful conversation extraction and flow tracking.
2//!
3//! This module provides Wireshark-inspired bidirectional conversation tracking
4//! with TCP state machine analysis, stream reassembly, and UDP timeout-based
5//! pseudo-conversation tracking.
6//!
7//! # Architecture
8//!
9//! - **Canonical Key**: Bidirectional 5-tuple with deterministic IP/port ordering
10//! - **Conversation Table**: DashMap-backed concurrent hash table
11//! - **TCP State Machine**: RFC 793 connection states with per-endpoint sequence tracking
12//! - **TCP Reassembly**: BTreeMap-based out-of-order segment management
13//! - **UDP Tracking**: Timeout-based pseudo-conversations
14//!
15//! # Usage
16//!
17//! ```rust,no_run
18//! use stackforge_core::flow::{extract_flows, FlowConfig};
19//! use stackforge_core::pcap::rdpcap;
20//!
21//! let packets = rdpcap("capture.pcap").unwrap();
22//! let conversations = extract_flows(&packets).unwrap();
23//! for conv in &conversations {
24//!     println!("{}: {} packets", conv.key, conv.total_packets());
25//! }
26//! ```
27
28pub mod config;
29pub mod error;
30pub mod icmp_state;
31pub mod key;
32pub mod spill;
33pub mod state;
34pub mod table;
35pub mod tcp_reassembly;
36pub mod tcp_state;
37pub mod udp_state;
38
39// Re-exports
40pub use config::FlowConfig;
41pub use error::FlowError;
42pub use icmp_state::IcmpFlowState;
43pub use key::{
44    CanonicalKey, FlowDirection, TransportProtocol, ZWaveKey, extract_key, extract_zwave_key,
45};
46pub use state::{
47    ConversationState, ConversationStatus, DirectionStats, ProtocolState, ZWaveFlowState,
48};
49pub use table::ConversationTable;
50pub use tcp_reassembly::{ReassemblyAction, TcpReassembler};
51pub use tcp_state::{TcpConnectionState, TcpConversationState, TcpEndpointState};
52pub use udp_state::UdpFlowState;
53
54use std::collections::HashMap;
55use std::path::Path;
56
57use crate::error::PacketError;
58use crate::layer::LayerKind;
59use crate::pcap::{CaptureIterator, CapturedPacket};
60
61/// Extract bidirectional conversations from a list of captured packets.
62///
63/// This is the primary entry point for flow extraction. It processes all
64/// packets sequentially, groups them into bidirectional conversations using
65/// canonical key normalization, tracks TCP connection state and performs
66/// stream reassembly, and tracks UDP pseudo-conversations via timeouts.
67///
68/// Returns conversations sorted by start time.
69pub fn extract_flows(packets: &[CapturedPacket]) -> Result<Vec<ConversationState>, FlowError> {
70    extract_flows_with_config(packets, FlowConfig::default())
71}
72
73/// Extract flows with custom configuration.
74pub fn extract_flows_with_config(
75    packets: &[CapturedPacket],
76    config: FlowConfig,
77) -> Result<Vec<ConversationState>, FlowError> {
78    let verbose = config.verbose;
79    let total = packets.len();
80    let table = ConversationTable::new(config);
81
82    if verbose {
83        eprintln!("[stackforge] Starting flow extraction ({total} packets)...");
84    }
85
86    for (index, captured) in packets.iter().enumerate() {
87        let timestamp = captured.metadata.timestamp;
88        table.ingest_packet(&captured.packet, timestamp, index)?;
89
90        if verbose && (index + 1) % 10_000 == 0 {
91            eprintln!(
92                "[stackforge] Processed {}/{total} packets ({} flows so far)",
93                index + 1,
94                table.conversation_count(),
95            );
96        }
97    }
98
99    let conversations = table.into_conversations();
100    if verbose {
101        eprintln!(
102            "[stackforge] Flow extraction complete: {total} packets -> {} conversations",
103            conversations.len(),
104        );
105    }
106    Ok(conversations)
107}
108
109/// Extract flows from a streaming packet source (iterator).
110///
111/// Does not require all packets in memory simultaneously — each packet is
112/// processed and then dropped. Only conversation state (metadata + reassembly
113/// buffers) is retained.
114///
115/// If `config.memory_budget` is set, reassembly buffers will be spilled to
116/// disk when the budget is exceeded.
117pub fn extract_flows_streaming<I>(
118    packets: I,
119    config: FlowConfig,
120) -> Result<Vec<ConversationState>, FlowError>
121where
122    I: Iterator<Item = Result<CapturedPacket, PacketError>>,
123{
124    let verbose = config.verbose;
125    let table = ConversationTable::new(config);
126
127    if verbose {
128        eprintln!("[stackforge] Starting streaming flow extraction...");
129    }
130
131    for (index, result) in packets.enumerate() {
132        let captured = result.map_err(FlowError::PacketError)?;
133        let timestamp = captured.metadata.timestamp;
134        table.ingest_packet(&captured.packet, timestamp, index)?;
135        // `captured` is dropped here — packet memory freed immediately
136
137        if verbose && (index + 1) % 10_000 == 0 {
138            eprintln!(
139                "[stackforge] Processed {} packets ({} flows so far)",
140                index + 1,
141                table.conversation_count(),
142            );
143        }
144    }
145
146    let conversations = table.into_conversations();
147    if verbose {
148        eprintln!(
149            "[stackforge] Flow extraction complete: {} conversations",
150            conversations.len(),
151        );
152    }
153    Ok(conversations)
154}
155
156/// Extract flows directly from a capture file (PCAP or PcapNG).
157///
158/// Streams packets from disk — never loads the entire file into memory.
159/// The file format is auto-detected from magic bytes.
160pub fn extract_flows_from_file(
161    path: impl AsRef<Path>,
162    config: FlowConfig,
163) -> Result<Vec<ConversationState>, FlowError> {
164    let verbose = config.verbose;
165    if verbose {
166        eprintln!(
167            "[stackforge] Opening capture file: {}",
168            path.as_ref().display(),
169        );
170    }
171    let iter = CaptureIterator::open(path).map_err(FlowError::PacketError)?;
172    extract_flows_streaming(iter, config)
173}
174
175/// Extract Z-Wave conversations from a list of captured packets.
176///
177/// Z-Wave is a wireless protocol not carried over IP, so it needs its own
178/// flow extraction separate from the IP-based `extract_flows()`. Packets
179/// are grouped by home ID and canonical node pair (smaller node = `node_a`).
180///
181/// Non-Z-Wave packets are silently skipped.
182pub fn extract_zwave_flows(
183    packets: &[CapturedPacket],
184) -> Result<Vec<ConversationState>, FlowError> {
185    let mut conversations: HashMap<ZWaveKey, ConversationState> = HashMap::new();
186
187    for (index, captured) in packets.iter().enumerate() {
188        let timestamp = captured.metadata.timestamp;
189        let packet = &captured.packet;
190
191        // Skip packets without a Z-Wave layer
192        if packet.get_layer(LayerKind::ZWave).is_none() {
193            continue;
194        }
195
196        let (key, direction) = match extract_zwave_key(packet) {
197            Ok(result) => result,
198            Err(_) => continue,
199        };
200
201        let byte_count = packet.as_bytes().len() as u64;
202        let buf = packet.as_bytes();
203
204        let conv = conversations.entry(key.clone()).or_insert_with(|| {
205            let mut state = ConversationState::new_zwave(key, timestamp);
206            if let ProtocolState::ZWave(ref mut zw) = state.protocol_state
207                && let Some(zwave) = packet.zwave()
208            {
209                zw.home_id = zwave.home_id(buf).unwrap_or(0);
210            }
211            state
212        });
213
214        conv.record_packet(direction, byte_count, timestamp, index, false, false);
215
216        // Track ACK vs command frames
217        if let ProtocolState::ZWave(ref mut zw) = conv.protocol_state
218            && let Some(zwave) = packet.zwave()
219        {
220            if zwave.is_ack(buf) {
221                zw.ack_count += 1;
222            } else {
223                zw.command_count += 1;
224            }
225        }
226    }
227
228    let mut result: Vec<ConversationState> = conversations.into_values().collect();
229    result.sort_by_key(|c| c.start_time);
230    Ok(result)
231}
232
233#[cfg(test)]
234mod tests {
235    use super::*;
236    use crate::layer::stack::{LayerStack, LayerStackEntry};
237    use crate::pcap::PcapMetadata;
238    use crate::{EthernetBuilder, Ipv4Builder, MacAddress, Packet, TcpBuilder, UdpBuilder};
239    use std::net::Ipv4Addr;
240    use std::time::Duration;
241
242    fn make_captured(packet: Packet, timestamp_secs: u64) -> CapturedPacket {
243        CapturedPacket {
244            packet,
245            metadata: PcapMetadata {
246                timestamp: Duration::from_secs(timestamp_secs),
247                orig_len: 0,
248                ..Default::default()
249            },
250        }
251    }
252
253    fn tcp_packet(
254        src_ip: Ipv4Addr,
255        dst_ip: Ipv4Addr,
256        sport: u16,
257        dport: u16,
258        flags: &str,
259    ) -> Packet {
260        let mut builder = TcpBuilder::new()
261            .src_port(sport)
262            .dst_port(dport)
263            .seq(1000)
264            .ack_num(0)
265            .window(65535);
266
267        for c in flags.chars() {
268            builder = match c {
269                'S' => builder.syn(),
270                'A' => builder.ack(),
271                'F' => builder.fin(),
272                'R' => builder.rst(),
273                _ => builder,
274            };
275        }
276
277        LayerStack::new()
278            .push(LayerStackEntry::Ethernet(
279                EthernetBuilder::new()
280                    .dst(MacAddress::BROADCAST)
281                    .src(MacAddress::new([0, 1, 2, 3, 4, 5])),
282            ))
283            .push(LayerStackEntry::Ipv4(
284                Ipv4Builder::new().src(src_ip).dst(dst_ip),
285            ))
286            .push(LayerStackEntry::Tcp(builder))
287            .build_packet()
288    }
289
290    fn udp_packet(src_ip: Ipv4Addr, dst_ip: Ipv4Addr, sport: u16, dport: u16) -> Packet {
291        LayerStack::new()
292            .push(LayerStackEntry::Ethernet(
293                EthernetBuilder::new()
294                    .dst(MacAddress::BROADCAST)
295                    .src(MacAddress::new([0, 1, 2, 3, 4, 5])),
296            ))
297            .push(LayerStackEntry::Ipv4(
298                Ipv4Builder::new().src(src_ip).dst(dst_ip),
299            ))
300            .push(LayerStackEntry::Udp(
301                UdpBuilder::new().src_port(sport).dst_port(dport),
302            ))
303            .build_packet()
304    }
305
306    #[test]
307    fn test_extract_flows_empty() {
308        let result = extract_flows(&[]).unwrap();
309        assert!(result.is_empty());
310    }
311
312    #[test]
313    fn test_extract_flows_single_tcp() {
314        let packets = vec![
315            make_captured(
316                tcp_packet(
317                    Ipv4Addr::new(10, 0, 0, 1),
318                    Ipv4Addr::new(10, 0, 0, 2),
319                    12345,
320                    80,
321                    "S",
322                ),
323                1,
324            ),
325            make_captured(
326                tcp_packet(
327                    Ipv4Addr::new(10, 0, 0, 2),
328                    Ipv4Addr::new(10, 0, 0, 1),
329                    80,
330                    12345,
331                    "SA",
332                ),
333                2,
334            ),
335        ];
336
337        let conversations = extract_flows(&packets).unwrap();
338        assert_eq!(conversations.len(), 1);
339        assert_eq!(conversations[0].total_packets(), 2);
340        assert_eq!(conversations[0].key.protocol, TransportProtocol::Tcp);
341    }
342
343    #[test]
344    fn test_extract_flows_multiple_conversations() {
345        let packets = vec![
346            make_captured(
347                tcp_packet(
348                    Ipv4Addr::new(10, 0, 0, 1),
349                    Ipv4Addr::new(10, 0, 0, 2),
350                    12345,
351                    80,
352                    "S",
353                ),
354                1,
355            ),
356            make_captured(
357                udp_packet(
358                    Ipv4Addr::new(10, 0, 0, 1),
359                    Ipv4Addr::new(10, 0, 0, 3),
360                    54321,
361                    53,
362                ),
363                2,
364            ),
365            make_captured(
366                tcp_packet(
367                    Ipv4Addr::new(10, 0, 0, 2),
368                    Ipv4Addr::new(10, 0, 0, 1),
369                    80,
370                    12345,
371                    "SA",
372                ),
373                3,
374            ),
375        ];
376
377        let conversations = extract_flows(&packets).unwrap();
378        assert_eq!(conversations.len(), 2);
379        // Sorted by start time
380        assert!(conversations[0].start_time <= conversations[1].start_time);
381    }
382
383    #[test]
384    fn test_extract_flows_preserves_packet_indices() {
385        let packets = vec![
386            make_captured(
387                tcp_packet(
388                    Ipv4Addr::new(10, 0, 0, 1),
389                    Ipv4Addr::new(10, 0, 0, 2),
390                    12345,
391                    80,
392                    "S",
393                ),
394                1,
395            ),
396            make_captured(
397                tcp_packet(
398                    Ipv4Addr::new(10, 0, 0, 2),
399                    Ipv4Addr::new(10, 0, 0, 1),
400                    80,
401                    12345,
402                    "SA",
403                ),
404                2,
405            ),
406        ];
407
408        let conversations = extract_flows(&packets).unwrap();
409        assert_eq!(conversations[0].packet_indices, vec![0, 1]);
410    }
411}