Skip to main content

stackforge_core/flow/
mod.rs

1//! Stateful conversation extraction and flow tracking.
2//!
3//! This module provides Wireshark-inspired bidirectional conversation tracking
4//! with TCP state machine analysis, stream reassembly, and UDP timeout-based
5//! pseudo-conversation tracking.
6//!
7//! # Architecture
8//!
9//! - **Canonical Key**: Bidirectional 5-tuple with deterministic IP/port ordering
10//! - **Conversation Table**: DashMap-backed concurrent hash table
11//! - **TCP State Machine**: RFC 793 connection states with per-endpoint sequence tracking
12//! - **TCP Reassembly**: BTreeMap-based out-of-order segment management
13//! - **UDP Tracking**: Timeout-based pseudo-conversations
14//!
15//! # Usage
16//!
17//! ```rust,no_run
18//! use stackforge_core::flow::{extract_flows, FlowConfig};
19//! use stackforge_core::pcap::rdpcap;
20//!
21//! let packets = rdpcap("capture.pcap").unwrap();
22//! let conversations = extract_flows(&packets).unwrap();
23//! for conv in &conversations {
24//!     println!("{}: {} packets", conv.key, conv.total_packets());
25//! }
26//! ```
27
28pub mod config;
29pub mod error;
30pub mod key;
31pub mod state;
32pub mod table;
33pub mod tcp_reassembly;
34pub mod tcp_state;
35pub mod udp_state;
36
37// Re-exports
38pub use config::FlowConfig;
39pub use error::FlowError;
40pub use key::{CanonicalKey, FlowDirection, TransportProtocol, extract_key};
41pub use state::{ConversationState, ConversationStatus, DirectionStats, ProtocolState};
42pub use table::ConversationTable;
43pub use tcp_reassembly::{ReassemblyAction, TcpReassembler};
44pub use tcp_state::{TcpConnectionState, TcpConversationState, TcpEndpointState};
45pub use udp_state::UdpFlowState;
46
47use crate::pcap::CapturedPacket;
48
49/// Extract bidirectional conversations from a list of captured packets.
50///
51/// This is the primary entry point for flow extraction. It processes all
52/// packets sequentially, groups them into bidirectional conversations using
53/// canonical key normalization, tracks TCP connection state and performs
54/// stream reassembly, and tracks UDP pseudo-conversations via timeouts.
55///
56/// Returns conversations sorted by start time.
57pub fn extract_flows(packets: &[CapturedPacket]) -> Result<Vec<ConversationState>, FlowError> {
58    extract_flows_with_config(packets, FlowConfig::default())
59}
60
61/// Extract flows with custom configuration.
62pub fn extract_flows_with_config(
63    packets: &[CapturedPacket],
64    config: FlowConfig,
65) -> Result<Vec<ConversationState>, FlowError> {
66    let table = ConversationTable::new(config);
67
68    for (index, captured) in packets.iter().enumerate() {
69        let timestamp = captured.metadata.timestamp;
70        table.ingest_packet(&captured.packet, timestamp, index)?;
71    }
72
73    Ok(table.into_conversations())
74}
75
76#[cfg(test)]
77mod tests {
78    use super::*;
79    use crate::layer::stack::{LayerStack, LayerStackEntry};
80    use crate::pcap::PcapMetadata;
81    use crate::{EthernetBuilder, Ipv4Builder, MacAddress, Packet, TcpBuilder, UdpBuilder};
82    use std::net::Ipv4Addr;
83    use std::time::Duration;
84
85    fn make_captured(packet: Packet, timestamp_secs: u64) -> CapturedPacket {
86        CapturedPacket {
87            packet,
88            metadata: PcapMetadata {
89                timestamp: Duration::from_secs(timestamp_secs),
90                orig_len: 0,
91            },
92        }
93    }
94
95    fn tcp_packet(
96        src_ip: Ipv4Addr,
97        dst_ip: Ipv4Addr,
98        sport: u16,
99        dport: u16,
100        flags: &str,
101    ) -> Packet {
102        let mut builder = TcpBuilder::new()
103            .src_port(sport)
104            .dst_port(dport)
105            .seq(1000)
106            .ack_num(0)
107            .window(65535);
108
109        for c in flags.chars() {
110            builder = match c {
111                'S' => builder.syn(),
112                'A' => builder.ack(),
113                'F' => builder.fin(),
114                'R' => builder.rst(),
115                _ => builder,
116            };
117        }
118
119        LayerStack::new()
120            .push(LayerStackEntry::Ethernet(
121                EthernetBuilder::new()
122                    .dst(MacAddress::BROADCAST)
123                    .src(MacAddress::new([0, 1, 2, 3, 4, 5])),
124            ))
125            .push(LayerStackEntry::Ipv4(
126                Ipv4Builder::new().src(src_ip).dst(dst_ip),
127            ))
128            .push(LayerStackEntry::Tcp(builder))
129            .build_packet()
130    }
131
132    fn udp_packet(src_ip: Ipv4Addr, dst_ip: Ipv4Addr, sport: u16, dport: u16) -> Packet {
133        LayerStack::new()
134            .push(LayerStackEntry::Ethernet(
135                EthernetBuilder::new()
136                    .dst(MacAddress::BROADCAST)
137                    .src(MacAddress::new([0, 1, 2, 3, 4, 5])),
138            ))
139            .push(LayerStackEntry::Ipv4(
140                Ipv4Builder::new().src(src_ip).dst(dst_ip),
141            ))
142            .push(LayerStackEntry::Udp(
143                UdpBuilder::new().src_port(sport).dst_port(dport),
144            ))
145            .build_packet()
146    }
147
148    #[test]
149    fn test_extract_flows_empty() {
150        let result = extract_flows(&[]).unwrap();
151        assert!(result.is_empty());
152    }
153
154    #[test]
155    fn test_extract_flows_single_tcp() {
156        let packets = vec![
157            make_captured(
158                tcp_packet(
159                    Ipv4Addr::new(10, 0, 0, 1),
160                    Ipv4Addr::new(10, 0, 0, 2),
161                    12345,
162                    80,
163                    "S",
164                ),
165                1,
166            ),
167            make_captured(
168                tcp_packet(
169                    Ipv4Addr::new(10, 0, 0, 2),
170                    Ipv4Addr::new(10, 0, 0, 1),
171                    80,
172                    12345,
173                    "SA",
174                ),
175                2,
176            ),
177        ];
178
179        let conversations = extract_flows(&packets).unwrap();
180        assert_eq!(conversations.len(), 1);
181        assert_eq!(conversations[0].total_packets(), 2);
182        assert_eq!(conversations[0].key.protocol, TransportProtocol::Tcp);
183    }
184
185    #[test]
186    fn test_extract_flows_multiple_conversations() {
187        let packets = vec![
188            make_captured(
189                tcp_packet(
190                    Ipv4Addr::new(10, 0, 0, 1),
191                    Ipv4Addr::new(10, 0, 0, 2),
192                    12345,
193                    80,
194                    "S",
195                ),
196                1,
197            ),
198            make_captured(
199                udp_packet(
200                    Ipv4Addr::new(10, 0, 0, 1),
201                    Ipv4Addr::new(10, 0, 0, 3),
202                    54321,
203                    53,
204                ),
205                2,
206            ),
207            make_captured(
208                tcp_packet(
209                    Ipv4Addr::new(10, 0, 0, 2),
210                    Ipv4Addr::new(10, 0, 0, 1),
211                    80,
212                    12345,
213                    "SA",
214                ),
215                3,
216            ),
217        ];
218
219        let conversations = extract_flows(&packets).unwrap();
220        assert_eq!(conversations.len(), 2);
221        // Sorted by start time
222        assert!(conversations[0].start_time <= conversations[1].start_time);
223    }
224
225    #[test]
226    fn test_extract_flows_preserves_packet_indices() {
227        let packets = vec![
228            make_captured(
229                tcp_packet(
230                    Ipv4Addr::new(10, 0, 0, 1),
231                    Ipv4Addr::new(10, 0, 0, 2),
232                    12345,
233                    80,
234                    "S",
235                ),
236                1,
237            ),
238            make_captured(
239                tcp_packet(
240                    Ipv4Addr::new(10, 0, 0, 2),
241                    Ipv4Addr::new(10, 0, 0, 1),
242                    80,
243                    12345,
244                    "SA",
245                ),
246                2,
247            ),
248        ];
249
250        let conversations = extract_flows(&packets).unwrap();
251        assert_eq!(conversations[0].packet_indices, vec![0, 1]);
252    }
253}