parse_tcp/
connection.rs

1use std::fmt::Display;
2
3use tracing::{debug, info_span, trace, warn};
4use uuid::Uuid;
5
6use crate::flow_table::{Flow, FlowCompare};
7use crate::serialized::PacketExtra;
8use crate::stream::{in_range_wrapping, Stream, RESET_MAX_LOOKAHEAD};
9use crate::ConnectionHandler;
10use crate::TcpMeta;
11
12/// TCP handshake state
13#[derive(Debug, PartialEq)]
14pub enum ConnectionState {
15    /// not yet initialized
16    None,
17    /// SYN read, expect SYN/ACK
18    SynSent {
19        /// sequence number of initial SYN
20        seq_no: u32,
21    },
22    /// SYN/ACK read, expect ACK
23    SynReceived {
24        /// sequence number of SYN/ACK
25        seq_no: u32,
26        /// acknowledgment number of SYN/ACK
27        ack_no: u32,
28        /// window size of SYN/ACK
29        window_size: u16,
30        /// whether or not we saw the first SYN
31        syn_seen: bool,
32    },
33    /// handshake complete, connection established
34    Established {
35        /// initial sequence number of forward direction
36        forward_isn: u32,
37        /// initial sequence number of reverse direction
38        reverse_isn: u32,
39    },
40    /// connection closed
41    Closed,
42    /// connection fatally desynchronized
43    Desync,
44}
45
46/// packet direction
47#[derive(Clone, Copy, Debug, PartialEq, Eq)]
48pub enum Direction {
49    /// forward direction: client -> server, assuming client is whoever sent the
50    /// first SYN
51    Forward,
52    /// reverse direction: server -> client, assuming client is whoever sent the
53    /// first SYN
54    Reverse,
55}
56
57impl Direction {
58    pub fn swap(self) -> Direction {
59        match self {
60            Direction::Forward => Direction::Reverse,
61            Direction::Reverse => Direction::Forward,
62        }
63    }
64}
65
66impl Display for Direction {
67    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68        match self {
69            Direction::Forward => write!(f, "forward")?,
70            Direction::Reverse => write!(f, "reverse")?,
71        }
72        Ok(())
73    }
74}
75
76/// object representing TCP connection
77pub struct Connection<H: ConnectionHandler> {
78    /// unique identifier for connection
79    pub uuid: Uuid,
80    /// forward direction flow identifier
81    pub forward_flow: Flow,
82    /// state of connection handshake
83    pub conn_state: ConnectionState,
84
85    /// whether the full 3-way handshake was observed
86    pub observed_handshake: bool,
87    /// whether the connection close was observed (either by FIN or RST)
88    pub observed_close: bool,
89
90    /// forward direction stream
91    pub forward_stream: Stream,
92    /// reverse direction stream
93    pub reverse_stream: Stream,
94
95    /// event handler object
96    pub event_handler: Option<H>,
97}
98
99/// result from Connection::handle_packet
100pub enum HandlePacketResult {
101    /// everything was fine, probably
102    Fine,
103}
104
105impl<H: ConnectionHandler> Connection<H> {
106    /// create new connection with flow
107    pub fn new(
108        forward_flow: Flow,
109        handler_init_data: H::InitialData,
110    ) -> Result<Connection<H>, H::ConstructError> {
111        let mut conn = Connection {
112            uuid: Uuid::new_v4(),
113            forward_flow,
114            conn_state: ConnectionState::None,
115            observed_handshake: false,
116            observed_close: false,
117            forward_stream: Stream::new(),
118            reverse_stream: Stream::new(),
119            event_handler: None,
120        };
121        let handler = H::new(handler_init_data, &mut conn)?;
122        conn.event_handler = Some(handler);
123        Ok(conn)
124    }
125
126    /// get stream in direction
127    pub fn get_stream(&mut self, direction: Direction) -> &mut Stream {
128        match direction {
129            Direction::Forward => &mut self.forward_stream,
130            Direction::Reverse => &mut self.reverse_stream,
131        }
132    }
133
134    /// handle a packet supposedly belonging to this connection
135    #[tracing::instrument(name = "conn", skip_all, fields(id = %self.uuid))]
136    pub fn handle_packet(&mut self, meta: &TcpMeta, data: &[u8], extra: &PacketExtra) -> bool {
137        debug_assert_ne!(self.forward_flow.compare_tcp_meta(meta), FlowCompare::None);
138        if meta.flags.syn {
139            self.handle_syn(meta)
140        } else if meta.flags.rst {
141            self.handle_rst(meta, extra)
142        } else {
143            // FIN packets handled here too, as they may carry data
144            self.handle_data(meta, data, extra)
145        }
146    }
147
148    /// handle packet with SYN flag
149    pub fn handle_syn(&mut self, meta: &TcpMeta) -> bool {
150        debug_assert!(meta.flags.syn);
151        if meta.flags.rst {
152            // probably shouldn't happen
153            warn!("received strange packet with flags {:?}", meta.flags);
154        }
155        match self.conn_state {
156            ConnectionState::None => {
157                if meta.flags.ack {
158                    // SYN/ACK
159                    self.conn_state = ConnectionState::SynReceived {
160                        seq_no: meta.seq_number,
161                        ack_no: meta.ack_number,
162                        window_size: meta.window,
163                        syn_seen: false,
164                    };
165                    debug!(
166                        "handle_syn: got SYN/ACK (no SYN), None -> SynReceived (seq {}, ack {})",
167                        meta.seq_number, meta.ack_number
168                    );
169                    if let Some(scale) = meta.option_window_scale {
170                        trace!("got window scale (SYN/ACK): {}", scale);
171                        self.reverse_stream.set_window_scale(scale);
172                    }
173                    if self.forward_flow.compare_tcp_meta(meta) == FlowCompare::Forward {
174                        // SYN/ACK is expected server -> client
175                        trace!("handle_syn: got SYN/ACK, reversing forward_flow");
176                        self.forward_flow.reverse();
177                    }
178                    true
179                } else {
180                    // first SYN
181                    self.conn_state = ConnectionState::SynSent {
182                        seq_no: meta.seq_number,
183                    };
184                    debug!(
185                        "handle_syn: got SYN, None -> SynSent (seq {})",
186                        meta.seq_number
187                    );
188                    if let Some(scale) = meta.option_window_scale {
189                        trace!("got window scale (first SYN): {}", scale);
190                        self.forward_stream.set_window_scale(scale);
191                    }
192                    if self.forward_flow.compare_tcp_meta(meta) == FlowCompare::Reverse {
193                        // SYN is expected client -> server
194                        self.forward_flow.reverse();
195                    }
196                    true
197                }
198            }
199            ConnectionState::SynSent { seq_no } => {
200                // expect: SYN/ACK
201                if meta.flags.ack {
202                    // SYN/ACK received
203                    if self.forward_flow.compare_tcp_meta(meta) != FlowCompare::Reverse {
204                        // wrong direction?
205                        debug!("handle_syn: dropped SYN/ACK in wrong direction (state SynSent)");
206                        false
207                    } else {
208                        if meta.ack_number != seq_no + 1 {
209                            warn!(
210                                "SYN/ACK packet ack number mismatch: expected {}, found {}",
211                                seq_no + 1,
212                                meta.ack_number
213                            );
214                        }
215                        self.conn_state = ConnectionState::SynReceived {
216                            seq_no: meta.seq_number,
217                            ack_no: meta.ack_number,
218                            window_size: meta.window,
219                            syn_seen: true,
220                        };
221                        debug!(
222                            "handle_syn: received SYN/ACK, SynSent -> SynReceived (seq {}, ack {})",
223                            meta.seq_number, meta.ack_number
224                        );
225                        if let Some(scale) = meta.option_window_scale {
226                            trace!("got window scale (SYN/ACK): {}", scale);
227                            self.reverse_stream.set_window_scale(scale);
228                        }
229                        true
230                    }
231                } else {
232                    // likely duplicate SYN
233                    false
234                }
235            }
236            ConnectionState::SynReceived { .. } => {
237                // either duplicate SYN or SYN/ACK, ignore
238                false
239            }
240            ConnectionState::Established { .. } => {
241                // ???
242                warn!("received SYN for established connection?");
243                self.conn_state = ConnectionState::Desync;
244                let dir = self
245                    .forward_flow
246                    .compare_tcp_meta(meta)
247                    .to_direction()
248                    .expect("connection got unrelated packet");
249                self.call_handler(|conn, h| h.connection_desync(conn, dir));
250                false
251            }
252            _ => false, // ignore
253        }
254    }
255
256    /// handle packet with RST flag
257    pub fn handle_rst(&mut self, meta: &TcpMeta, extra: &PacketExtra) -> bool {
258        debug_assert!(meta.flags.rst);
259        let dir = self
260            .forward_flow
261            .compare_tcp_meta(meta)
262            .to_direction()
263            .expect("got unrelated flow");
264        match self.conn_state {
265            ConnectionState::None => {
266                // nothing to validate
267                debug!("handle_rst: received reset in {dir} direction in state None");
268            }
269            // note that rejecting potentially legitimate resets in the handshake states
270            // doesn't cause significant problems, as the connection will resync on the
271            // first data packet. similarly, accepting potentially invalid resets will
272            // simply cause the connection to be recreated on the next packet.
273            ConnectionState::SynSent { .. } => {
274                if dir == Direction::Forward {
275                    // reset in response to nothing?
276                    warn!(
277                        "received likely invalid reset in state SynSent with same direction as SYN"
278                    );
279                    return false;
280                }
281                // cannot really validate, assume valid
282                debug!("got reset ({dir}) in state SynSent, likely connection refused");
283            }
284            ConnectionState::SynReceived { seq_no, ack_no, .. } => {
285                let base = match dir {
286                    // reset should have seq after seq of SYN/ACK
287                    Direction::Forward => seq_no,
288                    // reset should have seq after ack of SYN/ACK
289                    Direction::Reverse => ack_no,
290                };
291
292                if in_range_wrapping(base, 0, RESET_MAX_LOOKAHEAD, meta.seq_number) {
293                    debug!("handle_rst: got reset ({dir}) in state SynReceived");
294                } else {
295                    warn!(
296                        "got likely invalid reset ({dir}) in state SynReceived (seq {}, base {})",
297                        meta.seq_number, base
298                    );
299                    return false;
300                }
301            }
302            ConnectionState::Established { .. } => {
303                // let the stream handle it
304                let sp = info_span!("stream", %dir);
305                let accepted = sp.in_scope(|| match dir {
306                    Direction::Forward => self
307                        .forward_stream
308                        .handle_rst_packet(meta.seq_number, extra),
309                    Direction::Reverse => self
310                        .reverse_stream
311                        .handle_rst_packet(meta.seq_number, extra),
312                });
313                if !accepted {
314                    return false;
315                }
316            }
317            ConnectionState::Closed | ConnectionState::Desync => {
318                // connection already dead
319                return false;
320            }
321        }
322
323        match dir {
324            Direction::Forward => {
325                self.forward_stream.had_reset = true;
326            }
327            Direction::Reverse => {
328                self.reverse_stream.had_reset = true;
329            }
330        }
331        self.conn_state = ConnectionState::Closed;
332        self.observed_close = true;
333        self.call_handler(|conn, h| h.rst_received(conn, dir, extra.clone()));
334        true
335    }
336
337    /// handle data packet received before SYN/ACK
338    pub fn handle_data_hs1(&mut self, meta: &TcpMeta, data: &[u8], extra: &PacketExtra) -> bool {
339        debug!(
340            "handle_data_hs1: received data before handshake completion, {:?} -> Established",
341            self.conn_state
342        );
343        let (forward_isn, reverse_isn) = match self.forward_flow.compare_tcp_meta(meta) {
344            FlowCompare::Forward => (meta.seq_number, meta.ack_number),
345            FlowCompare::Reverse => (meta.ack_number, meta.seq_number),
346            _ => unreachable!("got unrelated flow"),
347        };
348
349        self.conn_state = ConnectionState::Established {
350            forward_isn,
351            reverse_isn,
352        };
353
354        self.forward_stream.set_isn(forward_isn, 0);
355        self.reverse_stream.set_isn(reverse_isn, 0);
356
357        debug!("handle_data_hs1: assuming forward isn: {forward_isn}, reverse isn: {reverse_isn}");
358
359        self.call_handler(|conn, h| h.handshake_done(conn));
360
361        if !data.is_empty() {
362            self.handle_data_established(meta, data, extra)
363        } else {
364            true
365        }
366    }
367
368    /// handle data packet received after SYN/ACK
369    pub fn handle_data_hs2(&mut self, meta: &TcpMeta, data: &[u8], extra: &PacketExtra) -> bool {
370        let ConnectionState::SynReceived {
371            seq_no,
372            ack_no,
373            window_size: forward_window,
374            syn_seen,
375        } = self.conn_state
376        else {
377            panic!("handle_data_hs2: wrong state");
378        };
379
380        let mut reverse_window: u16 = 0;
381        let (forward_isn, reverse_isn) = match self.forward_flow.compare_tcp_meta(meta) {
382            FlowCompare::Forward => {
383                if meta.flags.ack && meta.seq_number == ack_no && meta.ack_number == seq_no + 1 {
384                    if syn_seen {
385                        self.observed_handshake = true;
386                        reverse_window = meta.window;
387                        debug!("handle_data_hs2: got complete handshake");
388                    } else {
389                        debug!("handle_data_hs2: got SYN/ACK and ACK of handshake");
390                    }
391                } else {
392                    debug!("handle_data_hs2: probably lost final packet of handshake")
393                }
394                (meta.seq_number, meta.ack_number)
395            }
396            FlowCompare::Reverse => {
397                debug!("handle_data_hs2: received reverse direction packet instead of final handshake ACK");
398                (meta.ack_number, meta.seq_number)
399            }
400            _ => unreachable!("got unrelated flow"),
401        };
402        debug!(
403            "handle_data_hs2: received data packet, SynReceived -> Established \
404            (forward_isn: {forward_isn}, reverse_isn: {reverse_isn})"
405        );
406
407        self.conn_state = ConnectionState::Established {
408            forward_isn,
409            reverse_isn,
410        };
411        self.forward_stream.set_isn(forward_isn, forward_window);
412        self.reverse_stream.set_isn(reverse_isn, reverse_window);
413        self.call_handler(|conn, h| h.handshake_done(conn));
414
415        if !data.is_empty() {
416            self.handle_data_established(meta, data, extra)
417        } else {
418            true
419        }
420    }
421
422    /// handle data after handshake is completed
423    pub fn handle_data_established(
424        &mut self,
425        meta: &TcpMeta,
426        data: &[u8],
427        extra: &PacketExtra,
428    ) -> bool {
429        let dir;
430        let (data_stream, ack_stream) = match self.forward_flow.compare_tcp_meta(meta) {
431            FlowCompare::Forward => {
432                dir = Direction::Forward;
433                (&mut self.forward_stream, &mut self.reverse_stream)
434            }
435            FlowCompare::Reverse => {
436                dir = Direction::Reverse;
437                (&mut self.reverse_stream, &mut self.forward_stream)
438            }
439            _ => unreachable!("got unrelated flow"),
440        };
441
442        let mut did_something = false;
443        let mut got_data = false;
444        if !data.is_empty() {
445            // write data to stream
446            let sp = info_span!("stream", %dir);
447            got_data = sp.in_scope(|| data_stream.handle_data_packet(meta.seq_number, data, extra));
448            did_something |= got_data;
449        }
450        let mut got_ack = false;
451        let mut ack_stream_got_end = false;
452        if meta.flags.ack {
453            let was_ended = ack_stream.has_ended;
454            // send ack to the stream in the opposite direction
455            let sp = info_span!("stream", dir = %dir.swap());
456            got_ack |=
457                sp.in_scope(|| ack_stream.handle_ack_packet(meta.ack_number, meta.window, extra));
458            did_something |= got_ack;
459            // set ack offset on stream to correlate directions
460            data_stream.reverse_acked = ack_stream.highest_acked;
461
462            if !was_ended && ack_stream.has_ended {
463                ack_stream_got_end = true;
464                trace!("handle_data: {} received ACK for FIN", dir.swap());
465            }
466        }
467        let data_stream_has_ended = data_stream.has_ended;
468        let mut got_fin = false;
469        if meta.flags.fin {
470            // notify stream of fin
471            let sp = info_span!("stream", %dir);
472            got_fin =
473                sp.in_scope(|| data_stream.handle_fin_packet(meta.seq_number, data.len(), extra));
474            did_something |= got_fin;
475        }
476
477        // call event handlers
478        if got_data {
479            self.call_handler(|conn, h| h.data_received(conn, dir));
480        }
481        if got_ack {
482            self.call_handler(|conn, h| h.ack_received(conn, dir));
483        }
484        if got_fin {
485            self.call_handler(|conn, h| h.fin_received(conn, dir));
486        }
487
488        if ack_stream_got_end {
489            self.call_handler(|conn, h| h.stream_end(conn, dir.swap()));
490
491            // update state if both sides closed
492            if data_stream_has_ended {
493                self.conn_state = ConnectionState::Closed;
494                self.observed_close = true;
495            }
496        }
497
498        did_something
499    }
500
501    /// handle ordinary data packet
502    pub fn handle_data(&mut self, meta: &TcpMeta, data: &[u8], extra: &PacketExtra) -> bool {
503        match self.conn_state {
504            ConnectionState::None | ConnectionState::SynSent { .. } => {
505                self.handle_data_hs1(meta, data, extra)
506            }
507            ConnectionState::SynReceived { .. } => self.handle_data_hs2(meta, data, extra),
508            _ => {
509                // established or (closed but more data)
510                self.handle_data_established(meta, data, extra)
511            }
512        }
513    }
514
515    /// call the event handler, if one exists
516    pub fn call_handler(&mut self, do_thing: impl FnOnce(&mut Self, &mut H)) {
517        if let Some(mut handler) = self.event_handler.take() {
518            do_thing(self, &mut handler);
519            self.event_handler = Some(handler);
520        }
521    }
522
523    /// called before connection is removed from hashtable
524    pub fn will_retire(&mut self) {
525        self.call_handler(|conn, h| h.will_retire(conn));
526    }
527}
528
529#[cfg(test)]
530mod test {
531    use crate::serialized::PacketExtra;
532    use crate::{initialize_logging, ConnectionHandler, TcpFlags, TcpMeta};
533    use parking_lot::Mutex;
534    use std::convert::Infallible;
535    use std::mem;
536
537    use super::{Connection, Direction};
538
539    /// swap src/dest ip/port and seq/ack
540    fn swap_meta(meta: &TcpMeta) -> TcpMeta {
541        let mut out = meta.clone();
542        // crimes against something, idk what, but it's crimes
543        macro_rules! swap {
544            ($i1:ident, $i2:ident) => {
545                mem::swap(&mut out.$i1, &mut out.$i2)
546            };
547        }
548        swap!(src_addr, dst_addr);
549        swap!(src_port, dst_port);
550        swap!(seq_number, ack_number);
551        out
552    }
553
554    static HANDSHAKE_DONE: Mutex<bool> = Mutex::new(false);
555    static DATA_RECEIVED: Mutex<Option<Direction>> = Mutex::new(None);
556    static FIN_RECEIVED: Mutex<Option<Direction>> = Mutex::new(None);
557    static RST_RECEIVED: Mutex<Option<Direction>> = Mutex::new(None);
558    static STREAM_END: Mutex<Option<Direction>> = Mutex::new(None);
559    static WILL_RETIRE: Mutex<bool> = Mutex::new(false);
560
561    struct TestHandler;
562    impl ConnectionHandler for TestHandler {
563        type InitialData = ();
564        type ConstructError = Infallible;
565        fn new(_init: (), _conn: &mut Connection<Self>) -> Result<Self, Infallible> {
566            Ok(TestHandler)
567        }
568        fn handshake_done(&mut self, _conn: &mut Connection<Self>) {
569            let mut guard = HANDSHAKE_DONE.lock();
570            *guard = true;
571        }
572        fn data_received(&mut self, _connection: &mut Connection<Self>, direction: Direction) {
573            let mut guard = DATA_RECEIVED.lock();
574            *guard = Some(direction);
575        }
576        fn fin_received(&mut self, _connection: &mut Connection<Self>, direction: Direction) {
577            let mut guard = FIN_RECEIVED.lock();
578            *guard = Some(direction);
579        }
580        fn rst_received(
581            &mut self,
582            _connection: &mut Connection<Self>,
583            direction: Direction,
584            _extra: PacketExtra,
585        ) {
586            let mut guard = RST_RECEIVED.lock();
587            *guard = Some(direction);
588        }
589        fn stream_end(&mut self, _connection: &mut Connection<Self>, direction: Direction) {
590            let mut guard = STREAM_END.lock();
591            *guard = Some(direction);
592        }
593        fn will_retire(&mut self, _connection: &mut Connection<Self>) {
594            let mut guard = WILL_RETIRE.lock();
595            *guard = true;
596        }
597    }
598
599    #[test]
600    fn simple() {
601        initialize_logging();
602
603        let hs1 = TcpMeta {
604            src_addr: [91, 92, 144, 105].into(),
605            src_port: 3161,
606            dst_addr: [23, 146, 104, 1].into(),
607            dst_port: 45143,
608            seq_number: 1587232,
609            ack_number: 0,
610            flags: TcpFlags {
611                syn: true,
612                ..Default::default()
613            },
614            window: 256,
615            option_window_scale: Some(2),
616            option_timestamp: None,
617        };
618
619        let mut conn: Connection<TestHandler> = Connection::new((&hs1).into(), ()).unwrap();
620        assert!(conn.handle_packet(&hs1, &[], &PacketExtra::None));
621        let mut hs2 = swap_meta(&hs1);
622        hs2.seq_number = 315848;
623        hs2.ack_number += 1;
624        hs2.flags.ack = true;
625        assert!(conn.handle_packet(&hs2, &[], &PacketExtra::None));
626        let mut hs3 = swap_meta(&hs2);
627        hs3.ack_number += 1;
628        hs3.flags.syn = false;
629        assert!(conn.handle_packet(&hs3, &[], &PacketExtra::None));
630
631        let mut hs_done = HANDSHAKE_DONE.lock();
632        assert!(*hs_done);
633        *hs_done = false;
634
635        let data1 = hs3.clone();
636        assert!(conn.handle_packet(&data1, b"test", &PacketExtra::None));
637        assert_eq!(conn.forward_stream.readable_buffered_length(), 4);
638    }
639}