1use std::fmt::Display;
2
3use tracing::{debug, info_span, trace, warn};
4use uuid::Uuid;
5
6use crate::flow_table::{Flow, FlowCompare};
7use crate::serialized::PacketExtra;
8use crate::stream::{in_range_wrapping, Stream, RESET_MAX_LOOKAHEAD};
9use crate::ConnectionHandler;
10use crate::TcpMeta;
11
12#[derive(Debug, PartialEq)]
14pub enum ConnectionState {
15 None,
17 SynSent {
19 seq_no: u32,
21 },
22 SynReceived {
24 seq_no: u32,
26 ack_no: u32,
28 window_size: u16,
30 syn_seen: bool,
32 },
33 Established {
35 forward_isn: u32,
37 reverse_isn: u32,
39 },
40 Closed,
42 Desync,
44}
45
46#[derive(Clone, Copy, Debug, PartialEq, Eq)]
48pub enum Direction {
49 Forward,
52 Reverse,
55}
56
57impl Direction {
58 pub fn swap(self) -> Direction {
59 match self {
60 Direction::Forward => Direction::Reverse,
61 Direction::Reverse => Direction::Forward,
62 }
63 }
64}
65
66impl Display for Direction {
67 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68 match self {
69 Direction::Forward => write!(f, "forward")?,
70 Direction::Reverse => write!(f, "reverse")?,
71 }
72 Ok(())
73 }
74}
75
76pub struct Connection<H: ConnectionHandler> {
78 pub uuid: Uuid,
80 pub forward_flow: Flow,
82 pub conn_state: ConnectionState,
84
85 pub observed_handshake: bool,
87 pub observed_close: bool,
89
90 pub forward_stream: Stream,
92 pub reverse_stream: Stream,
94
95 pub event_handler: Option<H>,
97}
98
99pub enum HandlePacketResult {
101 Fine,
103}
104
105impl<H: ConnectionHandler> Connection<H> {
106 pub fn new(
108 forward_flow: Flow,
109 handler_init_data: H::InitialData,
110 ) -> Result<Connection<H>, H::ConstructError> {
111 let mut conn = Connection {
112 uuid: Uuid::new_v4(),
113 forward_flow,
114 conn_state: ConnectionState::None,
115 observed_handshake: false,
116 observed_close: false,
117 forward_stream: Stream::new(),
118 reverse_stream: Stream::new(),
119 event_handler: None,
120 };
121 let handler = H::new(handler_init_data, &mut conn)?;
122 conn.event_handler = Some(handler);
123 Ok(conn)
124 }
125
126 pub fn get_stream(&mut self, direction: Direction) -> &mut Stream {
128 match direction {
129 Direction::Forward => &mut self.forward_stream,
130 Direction::Reverse => &mut self.reverse_stream,
131 }
132 }
133
134 #[tracing::instrument(name = "conn", skip_all, fields(id = %self.uuid))]
136 pub fn handle_packet(&mut self, meta: &TcpMeta, data: &[u8], extra: &PacketExtra) -> bool {
137 debug_assert_ne!(self.forward_flow.compare_tcp_meta(meta), FlowCompare::None);
138 if meta.flags.syn {
139 self.handle_syn(meta)
140 } else if meta.flags.rst {
141 self.handle_rst(meta, extra)
142 } else {
143 self.handle_data(meta, data, extra)
145 }
146 }
147
148 pub fn handle_syn(&mut self, meta: &TcpMeta) -> bool {
150 debug_assert!(meta.flags.syn);
151 if meta.flags.rst {
152 warn!("received strange packet with flags {:?}", meta.flags);
154 }
155 match self.conn_state {
156 ConnectionState::None => {
157 if meta.flags.ack {
158 self.conn_state = ConnectionState::SynReceived {
160 seq_no: meta.seq_number,
161 ack_no: meta.ack_number,
162 window_size: meta.window,
163 syn_seen: false,
164 };
165 debug!(
166 "handle_syn: got SYN/ACK (no SYN), None -> SynReceived (seq {}, ack {})",
167 meta.seq_number, meta.ack_number
168 );
169 if let Some(scale) = meta.option_window_scale {
170 trace!("got window scale (SYN/ACK): {}", scale);
171 self.reverse_stream.set_window_scale(scale);
172 }
173 if self.forward_flow.compare_tcp_meta(meta) == FlowCompare::Forward {
174 trace!("handle_syn: got SYN/ACK, reversing forward_flow");
176 self.forward_flow.reverse();
177 }
178 true
179 } else {
180 self.conn_state = ConnectionState::SynSent {
182 seq_no: meta.seq_number,
183 };
184 debug!(
185 "handle_syn: got SYN, None -> SynSent (seq {})",
186 meta.seq_number
187 );
188 if let Some(scale) = meta.option_window_scale {
189 trace!("got window scale (first SYN): {}", scale);
190 self.forward_stream.set_window_scale(scale);
191 }
192 if self.forward_flow.compare_tcp_meta(meta) == FlowCompare::Reverse {
193 self.forward_flow.reverse();
195 }
196 true
197 }
198 }
199 ConnectionState::SynSent { seq_no } => {
200 if meta.flags.ack {
202 if self.forward_flow.compare_tcp_meta(meta) != FlowCompare::Reverse {
204 debug!("handle_syn: dropped SYN/ACK in wrong direction (state SynSent)");
206 false
207 } else {
208 if meta.ack_number != seq_no + 1 {
209 warn!(
210 "SYN/ACK packet ack number mismatch: expected {}, found {}",
211 seq_no + 1,
212 meta.ack_number
213 );
214 }
215 self.conn_state = ConnectionState::SynReceived {
216 seq_no: meta.seq_number,
217 ack_no: meta.ack_number,
218 window_size: meta.window,
219 syn_seen: true,
220 };
221 debug!(
222 "handle_syn: received SYN/ACK, SynSent -> SynReceived (seq {}, ack {})",
223 meta.seq_number, meta.ack_number
224 );
225 if let Some(scale) = meta.option_window_scale {
226 trace!("got window scale (SYN/ACK): {}", scale);
227 self.reverse_stream.set_window_scale(scale);
228 }
229 true
230 }
231 } else {
232 false
234 }
235 }
236 ConnectionState::SynReceived { .. } => {
237 false
239 }
240 ConnectionState::Established { .. } => {
241 warn!("received SYN for established connection?");
243 self.conn_state = ConnectionState::Desync;
244 let dir = self
245 .forward_flow
246 .compare_tcp_meta(meta)
247 .to_direction()
248 .expect("connection got unrelated packet");
249 self.call_handler(|conn, h| h.connection_desync(conn, dir));
250 false
251 }
252 _ => false, }
254 }
255
256 pub fn handle_rst(&mut self, meta: &TcpMeta, extra: &PacketExtra) -> bool {
258 debug_assert!(meta.flags.rst);
259 let dir = self
260 .forward_flow
261 .compare_tcp_meta(meta)
262 .to_direction()
263 .expect("got unrelated flow");
264 match self.conn_state {
265 ConnectionState::None => {
266 debug!("handle_rst: received reset in {dir} direction in state None");
268 }
269 ConnectionState::SynSent { .. } => {
274 if dir == Direction::Forward {
275 warn!(
277 "received likely invalid reset in state SynSent with same direction as SYN"
278 );
279 return false;
280 }
281 debug!("got reset ({dir}) in state SynSent, likely connection refused");
283 }
284 ConnectionState::SynReceived { seq_no, ack_no, .. } => {
285 let base = match dir {
286 Direction::Forward => seq_no,
288 Direction::Reverse => ack_no,
290 };
291
292 if in_range_wrapping(base, 0, RESET_MAX_LOOKAHEAD, meta.seq_number) {
293 debug!("handle_rst: got reset ({dir}) in state SynReceived");
294 } else {
295 warn!(
296 "got likely invalid reset ({dir}) in state SynReceived (seq {}, base {})",
297 meta.seq_number, base
298 );
299 return false;
300 }
301 }
302 ConnectionState::Established { .. } => {
303 let sp = info_span!("stream", %dir);
305 let accepted = sp.in_scope(|| match dir {
306 Direction::Forward => self
307 .forward_stream
308 .handle_rst_packet(meta.seq_number, extra),
309 Direction::Reverse => self
310 .reverse_stream
311 .handle_rst_packet(meta.seq_number, extra),
312 });
313 if !accepted {
314 return false;
315 }
316 }
317 ConnectionState::Closed | ConnectionState::Desync => {
318 return false;
320 }
321 }
322
323 match dir {
324 Direction::Forward => {
325 self.forward_stream.had_reset = true;
326 }
327 Direction::Reverse => {
328 self.reverse_stream.had_reset = true;
329 }
330 }
331 self.conn_state = ConnectionState::Closed;
332 self.observed_close = true;
333 self.call_handler(|conn, h| h.rst_received(conn, dir, extra.clone()));
334 true
335 }
336
337 pub fn handle_data_hs1(&mut self, meta: &TcpMeta, data: &[u8], extra: &PacketExtra) -> bool {
339 debug!(
340 "handle_data_hs1: received data before handshake completion, {:?} -> Established",
341 self.conn_state
342 );
343 let (forward_isn, reverse_isn) = match self.forward_flow.compare_tcp_meta(meta) {
344 FlowCompare::Forward => (meta.seq_number, meta.ack_number),
345 FlowCompare::Reverse => (meta.ack_number, meta.seq_number),
346 _ => unreachable!("got unrelated flow"),
347 };
348
349 self.conn_state = ConnectionState::Established {
350 forward_isn,
351 reverse_isn,
352 };
353
354 self.forward_stream.set_isn(forward_isn, 0);
355 self.reverse_stream.set_isn(reverse_isn, 0);
356
357 debug!("handle_data_hs1: assuming forward isn: {forward_isn}, reverse isn: {reverse_isn}");
358
359 self.call_handler(|conn, h| h.handshake_done(conn));
360
361 if !data.is_empty() {
362 self.handle_data_established(meta, data, extra)
363 } else {
364 true
365 }
366 }
367
368 pub fn handle_data_hs2(&mut self, meta: &TcpMeta, data: &[u8], extra: &PacketExtra) -> bool {
370 let ConnectionState::SynReceived {
371 seq_no,
372 ack_no,
373 window_size: forward_window,
374 syn_seen,
375 } = self.conn_state
376 else {
377 panic!("handle_data_hs2: wrong state");
378 };
379
380 let mut reverse_window: u16 = 0;
381 let (forward_isn, reverse_isn) = match self.forward_flow.compare_tcp_meta(meta) {
382 FlowCompare::Forward => {
383 if meta.flags.ack && meta.seq_number == ack_no && meta.ack_number == seq_no + 1 {
384 if syn_seen {
385 self.observed_handshake = true;
386 reverse_window = meta.window;
387 debug!("handle_data_hs2: got complete handshake");
388 } else {
389 debug!("handle_data_hs2: got SYN/ACK and ACK of handshake");
390 }
391 } else {
392 debug!("handle_data_hs2: probably lost final packet of handshake")
393 }
394 (meta.seq_number, meta.ack_number)
395 }
396 FlowCompare::Reverse => {
397 debug!("handle_data_hs2: received reverse direction packet instead of final handshake ACK");
398 (meta.ack_number, meta.seq_number)
399 }
400 _ => unreachable!("got unrelated flow"),
401 };
402 debug!(
403 "handle_data_hs2: received data packet, SynReceived -> Established \
404 (forward_isn: {forward_isn}, reverse_isn: {reverse_isn})"
405 );
406
407 self.conn_state = ConnectionState::Established {
408 forward_isn,
409 reverse_isn,
410 };
411 self.forward_stream.set_isn(forward_isn, forward_window);
412 self.reverse_stream.set_isn(reverse_isn, reverse_window);
413 self.call_handler(|conn, h| h.handshake_done(conn));
414
415 if !data.is_empty() {
416 self.handle_data_established(meta, data, extra)
417 } else {
418 true
419 }
420 }
421
422 pub fn handle_data_established(
424 &mut self,
425 meta: &TcpMeta,
426 data: &[u8],
427 extra: &PacketExtra,
428 ) -> bool {
429 let dir;
430 let (data_stream, ack_stream) = match self.forward_flow.compare_tcp_meta(meta) {
431 FlowCompare::Forward => {
432 dir = Direction::Forward;
433 (&mut self.forward_stream, &mut self.reverse_stream)
434 }
435 FlowCompare::Reverse => {
436 dir = Direction::Reverse;
437 (&mut self.reverse_stream, &mut self.forward_stream)
438 }
439 _ => unreachable!("got unrelated flow"),
440 };
441
442 let mut did_something = false;
443 let mut got_data = false;
444 if !data.is_empty() {
445 let sp = info_span!("stream", %dir);
447 got_data = sp.in_scope(|| data_stream.handle_data_packet(meta.seq_number, data, extra));
448 did_something |= got_data;
449 }
450 let mut got_ack = false;
451 let mut ack_stream_got_end = false;
452 if meta.flags.ack {
453 let was_ended = ack_stream.has_ended;
454 let sp = info_span!("stream", dir = %dir.swap());
456 got_ack |=
457 sp.in_scope(|| ack_stream.handle_ack_packet(meta.ack_number, meta.window, extra));
458 did_something |= got_ack;
459 data_stream.reverse_acked = ack_stream.highest_acked;
461
462 if !was_ended && ack_stream.has_ended {
463 ack_stream_got_end = true;
464 trace!("handle_data: {} received ACK for FIN", dir.swap());
465 }
466 }
467 let data_stream_has_ended = data_stream.has_ended;
468 let mut got_fin = false;
469 if meta.flags.fin {
470 let sp = info_span!("stream", %dir);
472 got_fin =
473 sp.in_scope(|| data_stream.handle_fin_packet(meta.seq_number, data.len(), extra));
474 did_something |= got_fin;
475 }
476
477 if got_data {
479 self.call_handler(|conn, h| h.data_received(conn, dir));
480 }
481 if got_ack {
482 self.call_handler(|conn, h| h.ack_received(conn, dir));
483 }
484 if got_fin {
485 self.call_handler(|conn, h| h.fin_received(conn, dir));
486 }
487
488 if ack_stream_got_end {
489 self.call_handler(|conn, h| h.stream_end(conn, dir.swap()));
490
491 if data_stream_has_ended {
493 self.conn_state = ConnectionState::Closed;
494 self.observed_close = true;
495 }
496 }
497
498 did_something
499 }
500
501 pub fn handle_data(&mut self, meta: &TcpMeta, data: &[u8], extra: &PacketExtra) -> bool {
503 match self.conn_state {
504 ConnectionState::None | ConnectionState::SynSent { .. } => {
505 self.handle_data_hs1(meta, data, extra)
506 }
507 ConnectionState::SynReceived { .. } => self.handle_data_hs2(meta, data, extra),
508 _ => {
509 self.handle_data_established(meta, data, extra)
511 }
512 }
513 }
514
515 pub fn call_handler(&mut self, do_thing: impl FnOnce(&mut Self, &mut H)) {
517 if let Some(mut handler) = self.event_handler.take() {
518 do_thing(self, &mut handler);
519 self.event_handler = Some(handler);
520 }
521 }
522
523 pub fn will_retire(&mut self) {
525 self.call_handler(|conn, h| h.will_retire(conn));
526 }
527}
528
529#[cfg(test)]
530mod test {
531 use crate::serialized::PacketExtra;
532 use crate::{initialize_logging, ConnectionHandler, TcpFlags, TcpMeta};
533 use parking_lot::Mutex;
534 use std::convert::Infallible;
535 use std::mem;
536
537 use super::{Connection, Direction};
538
539 fn swap_meta(meta: &TcpMeta) -> TcpMeta {
541 let mut out = meta.clone();
542 macro_rules! swap {
544 ($i1:ident, $i2:ident) => {
545 mem::swap(&mut out.$i1, &mut out.$i2)
546 };
547 }
548 swap!(src_addr, dst_addr);
549 swap!(src_port, dst_port);
550 swap!(seq_number, ack_number);
551 out
552 }
553
554 static HANDSHAKE_DONE: Mutex<bool> = Mutex::new(false);
555 static DATA_RECEIVED: Mutex<Option<Direction>> = Mutex::new(None);
556 static FIN_RECEIVED: Mutex<Option<Direction>> = Mutex::new(None);
557 static RST_RECEIVED: Mutex<Option<Direction>> = Mutex::new(None);
558 static STREAM_END: Mutex<Option<Direction>> = Mutex::new(None);
559 static WILL_RETIRE: Mutex<bool> = Mutex::new(false);
560
561 struct TestHandler;
562 impl ConnectionHandler for TestHandler {
563 type InitialData = ();
564 type ConstructError = Infallible;
565 fn new(_init: (), _conn: &mut Connection<Self>) -> Result<Self, Infallible> {
566 Ok(TestHandler)
567 }
568 fn handshake_done(&mut self, _conn: &mut Connection<Self>) {
569 let mut guard = HANDSHAKE_DONE.lock();
570 *guard = true;
571 }
572 fn data_received(&mut self, _connection: &mut Connection<Self>, direction: Direction) {
573 let mut guard = DATA_RECEIVED.lock();
574 *guard = Some(direction);
575 }
576 fn fin_received(&mut self, _connection: &mut Connection<Self>, direction: Direction) {
577 let mut guard = FIN_RECEIVED.lock();
578 *guard = Some(direction);
579 }
580 fn rst_received(
581 &mut self,
582 _connection: &mut Connection<Self>,
583 direction: Direction,
584 _extra: PacketExtra,
585 ) {
586 let mut guard = RST_RECEIVED.lock();
587 *guard = Some(direction);
588 }
589 fn stream_end(&mut self, _connection: &mut Connection<Self>, direction: Direction) {
590 let mut guard = STREAM_END.lock();
591 *guard = Some(direction);
592 }
593 fn will_retire(&mut self, _connection: &mut Connection<Self>) {
594 let mut guard = WILL_RETIRE.lock();
595 *guard = true;
596 }
597 }
598
599 #[test]
600 fn simple() {
601 initialize_logging();
602
603 let hs1 = TcpMeta {
604 src_addr: [91, 92, 144, 105].into(),
605 src_port: 3161,
606 dst_addr: [23, 146, 104, 1].into(),
607 dst_port: 45143,
608 seq_number: 1587232,
609 ack_number: 0,
610 flags: TcpFlags {
611 syn: true,
612 ..Default::default()
613 },
614 window: 256,
615 option_window_scale: Some(2),
616 option_timestamp: None,
617 };
618
619 let mut conn: Connection<TestHandler> = Connection::new((&hs1).into(), ()).unwrap();
620 assert!(conn.handle_packet(&hs1, &[], &PacketExtra::None));
621 let mut hs2 = swap_meta(&hs1);
622 hs2.seq_number = 315848;
623 hs2.ack_number += 1;
624 hs2.flags.ack = true;
625 assert!(conn.handle_packet(&hs2, &[], &PacketExtra::None));
626 let mut hs3 = swap_meta(&hs2);
627 hs3.ack_number += 1;
628 hs3.flags.syn = false;
629 assert!(conn.handle_packet(&hs3, &[], &PacketExtra::None));
630
631 let mut hs_done = HANDSHAKE_DONE.lock();
632 assert!(*hs_done);
633 *hs_done = false;
634
635 let data1 = hs3.clone();
636 assert!(conn.handle_packet(&data1, b"test", &PacketExtra::None));
637 assert_eq!(conn.forward_stream.readable_buffered_length(), 4);
638 }
639}