Skip to main content

stackforge_core/flow/
mod.rs

1//! Stateful conversation extraction and flow tracking.
2//!
3//! This module provides Wireshark-inspired bidirectional conversation tracking
4//! with TCP state machine analysis, stream reassembly, and UDP timeout-based
5//! pseudo-conversation tracking.
6//!
7//! # Architecture
8//!
9//! - **Canonical Key**: Bidirectional 5-tuple with deterministic IP/port ordering
10//! - **Conversation Table**: DashMap-backed concurrent hash table
11//! - **TCP State Machine**: RFC 793 connection states with per-endpoint sequence tracking
12//! - **TCP Reassembly**: BTreeMap-based out-of-order segment management
13//! - **UDP Tracking**: Timeout-based pseudo-conversations
14//!
15//! # Usage
16//!
17//! ```rust,no_run
18//! use stackforge_core::flow::{extract_flows, FlowConfig};
19//! use stackforge_core::pcap::rdpcap;
20//!
21//! let packets = rdpcap("capture.pcap").unwrap();
22//! let conversations = extract_flows(&packets).unwrap();
23//! for conv in &conversations {
24//!     println!("{}: {} packets", conv.key, conv.total_packets());
25//! }
26//! ```
27
28pub mod config;
29pub mod error;
30pub mod icmp_state;
31pub mod key;
32pub mod state;
33pub mod table;
34pub mod tcp_reassembly;
35pub mod tcp_state;
36pub mod udp_state;
37
38// Re-exports
39pub use config::FlowConfig;
40pub use error::FlowError;
41pub use icmp_state::IcmpFlowState;
42pub use key::{
43    CanonicalKey, FlowDirection, TransportProtocol, ZWaveKey, extract_key, extract_zwave_key,
44};
45pub use state::{
46    ConversationState, ConversationStatus, DirectionStats, ProtocolState, ZWaveFlowState,
47};
48pub use table::ConversationTable;
49pub use tcp_reassembly::{ReassemblyAction, TcpReassembler};
50pub use tcp_state::{TcpConnectionState, TcpConversationState, TcpEndpointState};
51pub use udp_state::UdpFlowState;
52
53use std::collections::HashMap;
54
55use crate::layer::LayerKind;
56use crate::pcap::CapturedPacket;
57
58/// Extract bidirectional conversations from a list of captured packets.
59///
60/// This is the primary entry point for flow extraction. It processes all
61/// packets sequentially, groups them into bidirectional conversations using
62/// canonical key normalization, tracks TCP connection state and performs
63/// stream reassembly, and tracks UDP pseudo-conversations via timeouts.
64///
65/// Returns conversations sorted by start time.
66pub fn extract_flows(packets: &[CapturedPacket]) -> Result<Vec<ConversationState>, FlowError> {
67    extract_flows_with_config(packets, FlowConfig::default())
68}
69
70/// Extract flows with custom configuration.
71pub fn extract_flows_with_config(
72    packets: &[CapturedPacket],
73    config: FlowConfig,
74) -> Result<Vec<ConversationState>, FlowError> {
75    let table = ConversationTable::new(config);
76
77    for (index, captured) in packets.iter().enumerate() {
78        let timestamp = captured.metadata.timestamp;
79        table.ingest_packet(&captured.packet, timestamp, index)?;
80    }
81
82    Ok(table.into_conversations())
83}
84
85/// Extract Z-Wave conversations from a list of captured packets.
86///
87/// Z-Wave is a wireless protocol not carried over IP, so it needs its own
88/// flow extraction separate from the IP-based `extract_flows()`. Packets
89/// are grouped by home ID and canonical node pair (smaller node = `node_a`).
90///
91/// Non-Z-Wave packets are silently skipped.
92pub fn extract_zwave_flows(
93    packets: &[CapturedPacket],
94) -> Result<Vec<ConversationState>, FlowError> {
95    let mut conversations: HashMap<ZWaveKey, ConversationState> = HashMap::new();
96
97    for (index, captured) in packets.iter().enumerate() {
98        let timestamp = captured.metadata.timestamp;
99        let packet = &captured.packet;
100
101        // Skip packets without a Z-Wave layer
102        if packet.get_layer(LayerKind::ZWave).is_none() {
103            continue;
104        }
105
106        let (key, direction) = match extract_zwave_key(packet) {
107            Ok(result) => result,
108            Err(_) => continue,
109        };
110
111        let byte_count = packet.as_bytes().len() as u64;
112        let buf = packet.as_bytes();
113
114        let conv = conversations.entry(key.clone()).or_insert_with(|| {
115            let mut state = ConversationState::new_zwave(key, timestamp);
116            if let ProtocolState::ZWave(ref mut zw) = state.protocol_state
117                && let Some(zwave) = packet.zwave()
118            {
119                zw.home_id = zwave.home_id(buf).unwrap_or(0);
120            }
121            state
122        });
123
124        conv.record_packet(direction, byte_count, timestamp, index, false, false);
125
126        // Track ACK vs command frames
127        if let ProtocolState::ZWave(ref mut zw) = conv.protocol_state
128            && let Some(zwave) = packet.zwave()
129        {
130            if zwave.is_ack(buf) {
131                zw.ack_count += 1;
132            } else {
133                zw.command_count += 1;
134            }
135        }
136    }
137
138    let mut result: Vec<ConversationState> = conversations.into_values().collect();
139    result.sort_by_key(|c| c.start_time);
140    Ok(result)
141}
142
143#[cfg(test)]
144mod tests {
145    use super::*;
146    use crate::layer::stack::{LayerStack, LayerStackEntry};
147    use crate::pcap::PcapMetadata;
148    use crate::{EthernetBuilder, Ipv4Builder, MacAddress, Packet, TcpBuilder, UdpBuilder};
149    use std::net::Ipv4Addr;
150    use std::time::Duration;
151
152    fn make_captured(packet: Packet, timestamp_secs: u64) -> CapturedPacket {
153        CapturedPacket {
154            packet,
155            metadata: PcapMetadata {
156                timestamp: Duration::from_secs(timestamp_secs),
157                orig_len: 0,
158            },
159        }
160    }
161
162    fn tcp_packet(
163        src_ip: Ipv4Addr,
164        dst_ip: Ipv4Addr,
165        sport: u16,
166        dport: u16,
167        flags: &str,
168    ) -> Packet {
169        let mut builder = TcpBuilder::new()
170            .src_port(sport)
171            .dst_port(dport)
172            .seq(1000)
173            .ack_num(0)
174            .window(65535);
175
176        for c in flags.chars() {
177            builder = match c {
178                'S' => builder.syn(),
179                'A' => builder.ack(),
180                'F' => builder.fin(),
181                'R' => builder.rst(),
182                _ => builder,
183            };
184        }
185
186        LayerStack::new()
187            .push(LayerStackEntry::Ethernet(
188                EthernetBuilder::new()
189                    .dst(MacAddress::BROADCAST)
190                    .src(MacAddress::new([0, 1, 2, 3, 4, 5])),
191            ))
192            .push(LayerStackEntry::Ipv4(
193                Ipv4Builder::new().src(src_ip).dst(dst_ip),
194            ))
195            .push(LayerStackEntry::Tcp(builder))
196            .build_packet()
197    }
198
199    fn udp_packet(src_ip: Ipv4Addr, dst_ip: Ipv4Addr, sport: u16, dport: u16) -> Packet {
200        LayerStack::new()
201            .push(LayerStackEntry::Ethernet(
202                EthernetBuilder::new()
203                    .dst(MacAddress::BROADCAST)
204                    .src(MacAddress::new([0, 1, 2, 3, 4, 5])),
205            ))
206            .push(LayerStackEntry::Ipv4(
207                Ipv4Builder::new().src(src_ip).dst(dst_ip),
208            ))
209            .push(LayerStackEntry::Udp(
210                UdpBuilder::new().src_port(sport).dst_port(dport),
211            ))
212            .build_packet()
213    }
214
215    #[test]
216    fn test_extract_flows_empty() {
217        let result = extract_flows(&[]).unwrap();
218        assert!(result.is_empty());
219    }
220
221    #[test]
222    fn test_extract_flows_single_tcp() {
223        let packets = vec![
224            make_captured(
225                tcp_packet(
226                    Ipv4Addr::new(10, 0, 0, 1),
227                    Ipv4Addr::new(10, 0, 0, 2),
228                    12345,
229                    80,
230                    "S",
231                ),
232                1,
233            ),
234            make_captured(
235                tcp_packet(
236                    Ipv4Addr::new(10, 0, 0, 2),
237                    Ipv4Addr::new(10, 0, 0, 1),
238                    80,
239                    12345,
240                    "SA",
241                ),
242                2,
243            ),
244        ];
245
246        let conversations = extract_flows(&packets).unwrap();
247        assert_eq!(conversations.len(), 1);
248        assert_eq!(conversations[0].total_packets(), 2);
249        assert_eq!(conversations[0].key.protocol, TransportProtocol::Tcp);
250    }
251
252    #[test]
253    fn test_extract_flows_multiple_conversations() {
254        let packets = vec![
255            make_captured(
256                tcp_packet(
257                    Ipv4Addr::new(10, 0, 0, 1),
258                    Ipv4Addr::new(10, 0, 0, 2),
259                    12345,
260                    80,
261                    "S",
262                ),
263                1,
264            ),
265            make_captured(
266                udp_packet(
267                    Ipv4Addr::new(10, 0, 0, 1),
268                    Ipv4Addr::new(10, 0, 0, 3),
269                    54321,
270                    53,
271                ),
272                2,
273            ),
274            make_captured(
275                tcp_packet(
276                    Ipv4Addr::new(10, 0, 0, 2),
277                    Ipv4Addr::new(10, 0, 0, 1),
278                    80,
279                    12345,
280                    "SA",
281                ),
282                3,
283            ),
284        ];
285
286        let conversations = extract_flows(&packets).unwrap();
287        assert_eq!(conversations.len(), 2);
288        // Sorted by start time
289        assert!(conversations[0].start_time <= conversations[1].start_time);
290    }
291
292    #[test]
293    fn test_extract_flows_preserves_packet_indices() {
294        let packets = vec![
295            make_captured(
296                tcp_packet(
297                    Ipv4Addr::new(10, 0, 0, 1),
298                    Ipv4Addr::new(10, 0, 0, 2),
299                    12345,
300                    80,
301                    "S",
302                ),
303                1,
304            ),
305            make_captured(
306                tcp_packet(
307                    Ipv4Addr::new(10, 0, 0, 2),
308                    Ipv4Addr::new(10, 0, 0, 1),
309                    80,
310                    12345,
311                    "SA",
312                ),
313                2,
314            ),
315        ];
316
317        let conversations = extract_flows(&packets).unwrap();
318        assert_eq!(conversations[0].packet_indices, vec![0, 1]);
319    }
320}