Skip to main content

stackforge_core/flow/
mod.rs

1//! Stateful conversation extraction and flow tracking.
2//!
3//! This module provides Wireshark-inspired bidirectional conversation tracking
4//! with TCP state machine analysis, stream reassembly, and UDP timeout-based
5//! pseudo-conversation tracking.
6//!
7//! # Architecture
8//!
9//! - **Canonical Key**: Bidirectional 5-tuple with deterministic IP/port ordering
10//! - **Conversation Table**: DashMap-backed concurrent hash table
11//! - **TCP State Machine**: RFC 793 connection states with per-endpoint sequence tracking
12//! - **TCP Reassembly**: BTreeMap-based out-of-order segment management
13//! - **UDP Tracking**: Timeout-based pseudo-conversations
14//!
15//! # Usage
16//!
17//! ```rust,no_run
18//! use stackforge_core::flow::{extract_flows, FlowConfig};
19//! use stackforge_core::pcap::rdpcap;
20//!
21//! let packets = rdpcap("capture.pcap").unwrap();
22//! let conversations = extract_flows(&packets).unwrap();
23//! for conv in &conversations {
24//!     println!("{}: {} packets", conv.key, conv.total_packets());
25//! }
26//! ```
27
28pub mod config;
29pub mod error;
30pub mod icmp_state;
31pub mod key;
32pub mod spill;
33pub mod state;
34pub mod table;
35pub mod tcp_reassembly;
36pub mod tcp_state;
37pub mod udp_state;
38
39// Re-exports
40pub use config::FlowConfig;
41pub use error::FlowError;
42pub use icmp_state::IcmpFlowState;
43pub use key::{
44    CanonicalKey, FlowDirection, TransportProtocol, ZWaveKey, extract_key, extract_zwave_key,
45};
46pub use state::{
47    ConversationState, ConversationStatus, DirectionStats, ProtocolState, ZWaveFlowState,
48};
49pub use table::ConversationTable;
50pub use tcp_reassembly::{ReassemblyAction, TcpReassembler};
51pub use tcp_state::{TcpConnectionState, TcpConversationState, TcpEndpointState};
52pub use udp_state::UdpFlowState;
53
54use std::collections::HashMap;
55use std::path::Path;
56
57use crate::error::PacketError;
58use crate::layer::LayerKind;
59use crate::pcap::{CaptureIterator, CapturedPacket};
60
61/// Extract bidirectional conversations from a list of captured packets.
62///
63/// This is the primary entry point for flow extraction. It processes all
64/// packets sequentially, groups them into bidirectional conversations using
65/// canonical key normalization, tracks TCP connection state and performs
66/// stream reassembly, and tracks UDP pseudo-conversations via timeouts.
67///
68/// Returns conversations sorted by start time.
69pub fn extract_flows(packets: &[CapturedPacket]) -> Result<Vec<ConversationState>, FlowError> {
70    extract_flows_with_config(packets, FlowConfig::default())
71}
72
73/// Extract flows with custom configuration.
74pub fn extract_flows_with_config(
75    packets: &[CapturedPacket],
76    config: FlowConfig,
77) -> Result<Vec<ConversationState>, FlowError> {
78    let table = ConversationTable::new(config);
79
80    for (index, captured) in packets.iter().enumerate() {
81        let timestamp = captured.metadata.timestamp;
82        table.ingest_packet(&captured.packet, timestamp, index)?;
83    }
84
85    Ok(table.into_conversations())
86}
87
88/// Extract flows from a streaming packet source (iterator).
89///
90/// Does not require all packets in memory simultaneously — each packet is
91/// processed and then dropped. Only conversation state (metadata + reassembly
92/// buffers) is retained.
93///
94/// If `config.memory_budget` is set, reassembly buffers will be spilled to
95/// disk when the budget is exceeded.
96pub fn extract_flows_streaming<I>(
97    packets: I,
98    config: FlowConfig,
99) -> Result<Vec<ConversationState>, FlowError>
100where
101    I: Iterator<Item = Result<CapturedPacket, PacketError>>,
102{
103    let table = ConversationTable::new(config);
104
105    for (index, result) in packets.enumerate() {
106        let captured = result.map_err(FlowError::PacketError)?;
107        let timestamp = captured.metadata.timestamp;
108        table.ingest_packet(&captured.packet, timestamp, index)?;
109        // `captured` is dropped here — packet memory freed immediately
110    }
111
112    Ok(table.into_conversations())
113}
114
115/// Extract flows directly from a capture file (PCAP or PcapNG).
116///
117/// Streams packets from disk — never loads the entire file into memory.
118/// The file format is auto-detected from magic bytes.
119pub fn extract_flows_from_file(
120    path: impl AsRef<Path>,
121    config: FlowConfig,
122) -> Result<Vec<ConversationState>, FlowError> {
123    let iter = CaptureIterator::open(path).map_err(FlowError::PacketError)?;
124    extract_flows_streaming(iter, config)
125}
126
127/// Extract Z-Wave conversations from a list of captured packets.
128///
129/// Z-Wave is a wireless protocol not carried over IP, so it needs its own
130/// flow extraction separate from the IP-based `extract_flows()`. Packets
131/// are grouped by home ID and canonical node pair (smaller node = `node_a`).
132///
133/// Non-Z-Wave packets are silently skipped.
134pub fn extract_zwave_flows(
135    packets: &[CapturedPacket],
136) -> Result<Vec<ConversationState>, FlowError> {
137    let mut conversations: HashMap<ZWaveKey, ConversationState> = HashMap::new();
138
139    for (index, captured) in packets.iter().enumerate() {
140        let timestamp = captured.metadata.timestamp;
141        let packet = &captured.packet;
142
143        // Skip packets without a Z-Wave layer
144        if packet.get_layer(LayerKind::ZWave).is_none() {
145            continue;
146        }
147
148        let (key, direction) = match extract_zwave_key(packet) {
149            Ok(result) => result,
150            Err(_) => continue,
151        };
152
153        let byte_count = packet.as_bytes().len() as u64;
154        let buf = packet.as_bytes();
155
156        let conv = conversations.entry(key.clone()).or_insert_with(|| {
157            let mut state = ConversationState::new_zwave(key, timestamp);
158            if let ProtocolState::ZWave(ref mut zw) = state.protocol_state
159                && let Some(zwave) = packet.zwave()
160            {
161                zw.home_id = zwave.home_id(buf).unwrap_or(0);
162            }
163            state
164        });
165
166        conv.record_packet(direction, byte_count, timestamp, index, false, false);
167
168        // Track ACK vs command frames
169        if let ProtocolState::ZWave(ref mut zw) = conv.protocol_state
170            && let Some(zwave) = packet.zwave()
171        {
172            if zwave.is_ack(buf) {
173                zw.ack_count += 1;
174            } else {
175                zw.command_count += 1;
176            }
177        }
178    }
179
180    let mut result: Vec<ConversationState> = conversations.into_values().collect();
181    result.sort_by_key(|c| c.start_time);
182    Ok(result)
183}
184
185#[cfg(test)]
186mod tests {
187    use super::*;
188    use crate::layer::stack::{LayerStack, LayerStackEntry};
189    use crate::pcap::PcapMetadata;
190    use crate::{EthernetBuilder, Ipv4Builder, MacAddress, Packet, TcpBuilder, UdpBuilder};
191    use std::net::Ipv4Addr;
192    use std::time::Duration;
193
194    fn make_captured(packet: Packet, timestamp_secs: u64) -> CapturedPacket {
195        CapturedPacket {
196            packet,
197            metadata: PcapMetadata {
198                timestamp: Duration::from_secs(timestamp_secs),
199                orig_len: 0,
200                ..Default::default()
201            },
202        }
203    }
204
205    fn tcp_packet(
206        src_ip: Ipv4Addr,
207        dst_ip: Ipv4Addr,
208        sport: u16,
209        dport: u16,
210        flags: &str,
211    ) -> Packet {
212        let mut builder = TcpBuilder::new()
213            .src_port(sport)
214            .dst_port(dport)
215            .seq(1000)
216            .ack_num(0)
217            .window(65535);
218
219        for c in flags.chars() {
220            builder = match c {
221                'S' => builder.syn(),
222                'A' => builder.ack(),
223                'F' => builder.fin(),
224                'R' => builder.rst(),
225                _ => builder,
226            };
227        }
228
229        LayerStack::new()
230            .push(LayerStackEntry::Ethernet(
231                EthernetBuilder::new()
232                    .dst(MacAddress::BROADCAST)
233                    .src(MacAddress::new([0, 1, 2, 3, 4, 5])),
234            ))
235            .push(LayerStackEntry::Ipv4(
236                Ipv4Builder::new().src(src_ip).dst(dst_ip),
237            ))
238            .push(LayerStackEntry::Tcp(builder))
239            .build_packet()
240    }
241
242    fn udp_packet(src_ip: Ipv4Addr, dst_ip: Ipv4Addr, sport: u16, dport: u16) -> Packet {
243        LayerStack::new()
244            .push(LayerStackEntry::Ethernet(
245                EthernetBuilder::new()
246                    .dst(MacAddress::BROADCAST)
247                    .src(MacAddress::new([0, 1, 2, 3, 4, 5])),
248            ))
249            .push(LayerStackEntry::Ipv4(
250                Ipv4Builder::new().src(src_ip).dst(dst_ip),
251            ))
252            .push(LayerStackEntry::Udp(
253                UdpBuilder::new().src_port(sport).dst_port(dport),
254            ))
255            .build_packet()
256    }
257
258    #[test]
259    fn test_extract_flows_empty() {
260        let result = extract_flows(&[]).unwrap();
261        assert!(result.is_empty());
262    }
263
264    #[test]
265    fn test_extract_flows_single_tcp() {
266        let packets = vec![
267            make_captured(
268                tcp_packet(
269                    Ipv4Addr::new(10, 0, 0, 1),
270                    Ipv4Addr::new(10, 0, 0, 2),
271                    12345,
272                    80,
273                    "S",
274                ),
275                1,
276            ),
277            make_captured(
278                tcp_packet(
279                    Ipv4Addr::new(10, 0, 0, 2),
280                    Ipv4Addr::new(10, 0, 0, 1),
281                    80,
282                    12345,
283                    "SA",
284                ),
285                2,
286            ),
287        ];
288
289        let conversations = extract_flows(&packets).unwrap();
290        assert_eq!(conversations.len(), 1);
291        assert_eq!(conversations[0].total_packets(), 2);
292        assert_eq!(conversations[0].key.protocol, TransportProtocol::Tcp);
293    }
294
295    #[test]
296    fn test_extract_flows_multiple_conversations() {
297        let packets = vec![
298            make_captured(
299                tcp_packet(
300                    Ipv4Addr::new(10, 0, 0, 1),
301                    Ipv4Addr::new(10, 0, 0, 2),
302                    12345,
303                    80,
304                    "S",
305                ),
306                1,
307            ),
308            make_captured(
309                udp_packet(
310                    Ipv4Addr::new(10, 0, 0, 1),
311                    Ipv4Addr::new(10, 0, 0, 3),
312                    54321,
313                    53,
314                ),
315                2,
316            ),
317            make_captured(
318                tcp_packet(
319                    Ipv4Addr::new(10, 0, 0, 2),
320                    Ipv4Addr::new(10, 0, 0, 1),
321                    80,
322                    12345,
323                    "SA",
324                ),
325                3,
326            ),
327        ];
328
329        let conversations = extract_flows(&packets).unwrap();
330        assert_eq!(conversations.len(), 2);
331        // Sorted by start time
332        assert!(conversations[0].start_time <= conversations[1].start_time);
333    }
334
335    #[test]
336    fn test_extract_flows_preserves_packet_indices() {
337        let packets = vec![
338            make_captured(
339                tcp_packet(
340                    Ipv4Addr::new(10, 0, 0, 1),
341                    Ipv4Addr::new(10, 0, 0, 2),
342                    12345,
343                    80,
344                    "S",
345                ),
346                1,
347            ),
348            make_captured(
349                tcp_packet(
350                    Ipv4Addr::new(10, 0, 0, 2),
351                    Ipv4Addr::new(10, 0, 0, 1),
352                    80,
353                    12345,
354                    "SA",
355                ),
356                2,
357            ),
358        ];
359
360        let conversations = extract_flows(&packets).unwrap();
361        assert_eq!(conversations[0].packet_indices, vec![0, 1]);
362    }
363}