1pub mod config;
29pub mod error;
30pub mod key;
31pub mod state;
32pub mod table;
33pub mod tcp_reassembly;
34pub mod tcp_state;
35pub mod udp_state;
36
37pub use config::FlowConfig;
39pub use error::FlowError;
40pub use key::{CanonicalKey, FlowDirection, TransportProtocol, extract_key};
41pub use state::{ConversationState, ConversationStatus, DirectionStats, ProtocolState};
42pub use table::ConversationTable;
43pub use tcp_reassembly::{ReassemblyAction, TcpReassembler};
44pub use tcp_state::{TcpConnectionState, TcpConversationState, TcpEndpointState};
45pub use udp_state::UdpFlowState;
46
47use crate::pcap::CapturedPacket;
48
49pub fn extract_flows(packets: &[CapturedPacket]) -> Result<Vec<ConversationState>, FlowError> {
58 extract_flows_with_config(packets, FlowConfig::default())
59}
60
61pub fn extract_flows_with_config(
63 packets: &[CapturedPacket],
64 config: FlowConfig,
65) -> Result<Vec<ConversationState>, FlowError> {
66 let table = ConversationTable::new(config);
67
68 for (index, captured) in packets.iter().enumerate() {
69 let timestamp = captured.metadata.timestamp;
70 table.ingest_packet(&captured.packet, timestamp, index)?;
71 }
72
73 Ok(table.into_conversations())
74}
75
76#[cfg(test)]
77mod tests {
78 use super::*;
79 use crate::layer::stack::{LayerStack, LayerStackEntry};
80 use crate::pcap::PcapMetadata;
81 use crate::{EthernetBuilder, Ipv4Builder, MacAddress, Packet, TcpBuilder, UdpBuilder};
82 use std::net::Ipv4Addr;
83 use std::time::Duration;
84
85 fn make_captured(packet: Packet, timestamp_secs: u64) -> CapturedPacket {
86 CapturedPacket {
87 packet,
88 metadata: PcapMetadata {
89 timestamp: Duration::from_secs(timestamp_secs),
90 orig_len: 0,
91 },
92 }
93 }
94
95 fn tcp_packet(
96 src_ip: Ipv4Addr,
97 dst_ip: Ipv4Addr,
98 sport: u16,
99 dport: u16,
100 flags: &str,
101 ) -> Packet {
102 let mut builder = TcpBuilder::new()
103 .src_port(sport)
104 .dst_port(dport)
105 .seq(1000)
106 .ack_num(0)
107 .window(65535);
108
109 for c in flags.chars() {
110 builder = match c {
111 'S' => builder.syn(),
112 'A' => builder.ack(),
113 'F' => builder.fin(),
114 'R' => builder.rst(),
115 _ => builder,
116 };
117 }
118
119 LayerStack::new()
120 .push(LayerStackEntry::Ethernet(
121 EthernetBuilder::new()
122 .dst(MacAddress::BROADCAST)
123 .src(MacAddress::new([0, 1, 2, 3, 4, 5])),
124 ))
125 .push(LayerStackEntry::Ipv4(
126 Ipv4Builder::new().src(src_ip).dst(dst_ip),
127 ))
128 .push(LayerStackEntry::Tcp(builder))
129 .build_packet()
130 }
131
132 fn udp_packet(src_ip: Ipv4Addr, dst_ip: Ipv4Addr, sport: u16, dport: u16) -> Packet {
133 LayerStack::new()
134 .push(LayerStackEntry::Ethernet(
135 EthernetBuilder::new()
136 .dst(MacAddress::BROADCAST)
137 .src(MacAddress::new([0, 1, 2, 3, 4, 5])),
138 ))
139 .push(LayerStackEntry::Ipv4(
140 Ipv4Builder::new().src(src_ip).dst(dst_ip),
141 ))
142 .push(LayerStackEntry::Udp(
143 UdpBuilder::new().src_port(sport).dst_port(dport),
144 ))
145 .build_packet()
146 }
147
148 #[test]
149 fn test_extract_flows_empty() {
150 let result = extract_flows(&[]).unwrap();
151 assert!(result.is_empty());
152 }
153
154 #[test]
155 fn test_extract_flows_single_tcp() {
156 let packets = vec![
157 make_captured(
158 tcp_packet(
159 Ipv4Addr::new(10, 0, 0, 1),
160 Ipv4Addr::new(10, 0, 0, 2),
161 12345,
162 80,
163 "S",
164 ),
165 1,
166 ),
167 make_captured(
168 tcp_packet(
169 Ipv4Addr::new(10, 0, 0, 2),
170 Ipv4Addr::new(10, 0, 0, 1),
171 80,
172 12345,
173 "SA",
174 ),
175 2,
176 ),
177 ];
178
179 let conversations = extract_flows(&packets).unwrap();
180 assert_eq!(conversations.len(), 1);
181 assert_eq!(conversations[0].total_packets(), 2);
182 assert_eq!(conversations[0].key.protocol, TransportProtocol::Tcp);
183 }
184
185 #[test]
186 fn test_extract_flows_multiple_conversations() {
187 let packets = vec![
188 make_captured(
189 tcp_packet(
190 Ipv4Addr::new(10, 0, 0, 1),
191 Ipv4Addr::new(10, 0, 0, 2),
192 12345,
193 80,
194 "S",
195 ),
196 1,
197 ),
198 make_captured(
199 udp_packet(
200 Ipv4Addr::new(10, 0, 0, 1),
201 Ipv4Addr::new(10, 0, 0, 3),
202 54321,
203 53,
204 ),
205 2,
206 ),
207 make_captured(
208 tcp_packet(
209 Ipv4Addr::new(10, 0, 0, 2),
210 Ipv4Addr::new(10, 0, 0, 1),
211 80,
212 12345,
213 "SA",
214 ),
215 3,
216 ),
217 ];
218
219 let conversations = extract_flows(&packets).unwrap();
220 assert_eq!(conversations.len(), 2);
221 assert!(conversations[0].start_time <= conversations[1].start_time);
223 }
224
225 #[test]
226 fn test_extract_flows_preserves_packet_indices() {
227 let packets = vec![
228 make_captured(
229 tcp_packet(
230 Ipv4Addr::new(10, 0, 0, 1),
231 Ipv4Addr::new(10, 0, 0, 2),
232 12345,
233 80,
234 "S",
235 ),
236 1,
237 ),
238 make_captured(
239 tcp_packet(
240 Ipv4Addr::new(10, 0, 0, 2),
241 Ipv4Addr::new(10, 0, 0, 1),
242 80,
243 12345,
244 "SA",
245 ),
246 2,
247 ),
248 ];
249
250 let conversations = extract_flows(&packets).unwrap();
251 assert_eq!(conversations[0].packet_indices, vec![0, 1]);
252 }
253}