pako-core 0.1.2

pako (main library)
Documentation
use std::{
    cmp::Ordering,
    collections::{HashMap, VecDeque},
    fmt,
    net::IpAddr,
    num::Wrapping,
};

use log::{debug, info, trace, warn};
use pako_tools::{Duration, Flow, FlowID};
use pnet_macros_support::packet::Packet as PnetPacket;
use pnet_packet::tcp::{TcpFlags, TcpPacket};

const EARLY_DETECT_OVERLAP: bool = false;

#[derive(Debug, Eq, PartialEq)]
#[allow(dead_code)]
pub enum TcpStatus {
    Closed = 0,
    Listen,
    SynSent,
    SynRcv,
    Established,
    Closing,
    CloseWait,
    FinWait1,
    FinWait2,
    LastAck,
    TimeWait,
}

impl Default for TcpStatus {
    fn default() -> Self {
        TcpStatus::Closed
    }
}

#[derive(Debug)]
pub struct TcpSegment {
    pub rel_seq: Wrapping<u32>,
    pub rel_ack: Wrapping<u32>,
    pub flags: u16,
    pub data: Vec<u8>,
    pub pcap_index: usize,
}

impl TcpSegment {
    /// Return the offset of the overlapping area if `self` (as left) overlaps on `right`
    pub fn overlap_offset(&self, right: &TcpSegment) -> Option<usize> {
        let next_seq = self.rel_seq + Wrapping(self.data.len() as u32);
        if next_seq > right.rel_seq {
            let overlap_offset = (right.rel_seq - self.rel_seq).0 as usize;
            Some(overlap_offset)
        } else {
            None
        }
    }

    /// Splits the segment into two at the given offset.
    ///
    /// # Panics
    ///
    /// Panics if `offset > self.data.len()`
    pub fn split_off(&mut self, offset: usize) -> TcpSegment {
        debug_assert!(offset < self.data.len());
        let remaining = self.data.split_off(offset);
        let rel_seq = self.rel_seq + Wrapping(offset as u32);
        TcpSegment {
            data: remaining,
            rel_seq,
            ..*self
        }
    }
}

pub struct TcpPeer {
    /// Initial Seq number (absolute)
    isn: Wrapping<u32>,
    /// Initial Ack number (absolute)
    ian: Wrapping<u32>,
    /// Next Seq number
    next_rel_seq: Wrapping<u32>,
    /// Last acknowledged number
    last_rel_ack: Wrapping<u32>,
    /// Connection state
    status: TcpStatus,
    /// The current list of segments (ordered by rel_seq)
    segments: VecDeque<TcpSegment>,
    /// DEBUG: host address
    addr: IpAddr,
    /// DEBUG: port
    port: u16,
}

impl TcpPeer {
    fn insert_sorted(&mut self, s: TcpSegment) {
        for (n, item) in self.segments.iter().enumerate() {
            if item.rel_seq > s.rel_seq {
                self.segments.insert(n, s);
                return;
            }
        }
        self.segments.push_back(s);
    }
}

pub struct TcpStream {
    pub client: TcpPeer,
    pub server: TcpPeer,
    pub status: TcpStatus,
    // XXX timestamp of last seen packet
    pub last_seen_ts: Duration,
}

pub struct TcpStreamReassembly {
    pub m: HashMap<FlowID, TcpStream>,

    pub timeout: Duration,
}

impl Default for TcpStreamReassembly {
    fn default() -> Self {
        TcpStreamReassembly {
            m: HashMap::new(),
            timeout: Duration::new(14400, 0),
        }
    }
}

#[derive(Debug, Eq, PartialEq)]
pub enum TcpStreamError {
    Anomaly,
    /// Connection is OK, but sides are inverted
    Inverted,
    /// Packet received but connection has expired
    Expired,
    HandshakeFailed,
}

impl TcpPeer {
    pub fn new(addr: &IpAddr, port: u16) -> Self {
        TcpPeer {
            isn: Wrapping(0),
            ian: Wrapping(0),
            next_rel_seq: Wrapping(0),
            last_rel_ack: Wrapping(0),
            status: TcpStatus::Closed,
            segments: VecDeque::new(),
            addr: *addr,
            port,
        }
    }
}

impl TcpStream {
    pub fn new(flow: &Flow) -> Self {
        TcpStream {
            client: TcpPeer::new(&flow.five_tuple.src, flow.five_tuple.src_port),
            server: TcpPeer::new(&flow.five_tuple.dst, flow.five_tuple.dst_port),
            status: TcpStatus::Closed,
            last_seen_ts: flow.last_seen,
        }
    }

    pub fn handle_new_connection(
        &mut self,
        tcp: &TcpPacket,
        to_server: bool,
        pcap_index: usize,
    ) -> Result<Option<Vec<TcpSegment>>, TcpStreamError> {
        let seq = Wrapping(tcp.get_sequence());
        let ack = Wrapping(tcp.get_acknowledgement());
        let tcp_flags = tcp.get_flags();

        let (mut src, mut dst) = if to_server {
            (&mut self.client, &mut self.server)
        } else {
            (&mut self.server, &mut self.client)
        };

        match src.status {
            // Client -- SYN --> Server
            TcpStatus::Closed => {
                if tcp_flags & TcpFlags::RST != 0 {
                    // TODO check if destination.segments must be removed
                    // client sent a RST, this is expected
                    return Ok(None);
                }
                if tcp_flags & TcpFlags::SYN == 0 {
                    // not a SYN - usually happens at start of pcap if missed SYN
                    warn!("First packet of a TCP stream is not a SYN");
                    // test is ACK + data, and set established if possible
                    if tcp_flags & TcpFlags::ACK != 0 {
                        trace!("Trying to catch connection on the fly");
                        src.isn = seq;
                        src.ian = ack;
                        src.next_rel_seq = Wrapping(0);
                        src.status = TcpStatus::Established;
                        dst.isn = ack;
                        dst.ian = seq;
                        dst.status = TcpStatus::Established;
                        dst.last_rel_ack = Wrapping(0);
                        self.status = TcpStatus::Established;
                        // queue segment (even if FIN, to get correct seq numbers)
                        let segment = TcpSegment {
                            rel_seq: Wrapping(0),
                            rel_ack: Wrapping(0),
                            flags: tcp_flags,
                            data: tcp.payload().to_vec(), // XXX data cloned here
                            pcap_index,
                        };
                        queue_segment(src, segment);

                        return Ok(None);
                    }
                    return Err(TcpStreamError::Anomaly);
                }
                if tcp_flags & TcpFlags::ACK != 0 {
                    warn!("First packet is SYN+ACK - missed SYN?");
                    dst.isn = ack - Wrapping(1);
                    dst.status = TcpStatus::SynSent;
                    dst.next_rel_seq = Wrapping(1);
                    src.isn = seq;
                    src.ian = ack;
                    src.last_rel_ack = Wrapping(1);
                    src.next_rel_seq = Wrapping(1);
                    src.status = TcpStatus::Listen;
                    // swap sides and tell analyzer to do the same for flow
                    std::mem::swap(&mut self.client, &mut self.server);
                    return Err(TcpStreamError::Inverted);
                }
                src.isn = seq;
                src.next_rel_seq = Wrapping(1);
                dst.ian = seq;
                self.status = TcpStatus::SynSent;
                src.status = TcpStatus::SynSent;
                dst.status = TcpStatus::Listen;
                // do we have data ?
                if !tcp.payload().is_empty() {
                    warn!("Data in handshake SYN");
                    // conn.next_rel_seq += Wrapping(tcp.payload().len() as u32);
                    let segment = TcpSegment {
                        rel_seq: seq - src.isn,
                        rel_ack: ack - dst.isn,
                        flags: tcp_flags,
                        data: tcp.payload().to_vec(), // XXX data cloned here
                        pcap_index,
                    };
                    queue_segment(src, segment);
                }
            }
            // Server -- SYN+ACK --> Client
            TcpStatus::Listen => {
                if tcp_flags != (TcpFlags::SYN | TcpFlags::ACK) {
                    // XXX ?
                }
                // if we had data in SYN, add its length
                let next_rel_seq = if dst.segments.is_empty() {
                    Wrapping(1)
                } else {
                    Wrapping(1) + Wrapping(dst.segments[0].data.len() as u32)
                };
                if ack != dst.isn + next_rel_seq {
                    warn!("NEW/SYN-ACK: ack number is wrong");
                    return Err(TcpStreamError::HandshakeFailed);
                }
                src.isn = seq;
                src.next_rel_seq = Wrapping(1);
                dst.ian = seq;
                dst.last_rel_ack = Wrapping(1);

                src.status = TcpStatus::SynRcv;
                self.status = TcpStatus::SynRcv;

                // do not push data if we had some in SYN, it will be done after handshake succeeds
            }
            // Client -- ACK --> Server
            TcpStatus::SynSent => {
                if tcp_flags & TcpFlags::ACK == 0 {
                    if tcp_flags == TcpFlags::SYN {
                        // can be a SYN resend
                        if seq == src.isn && ack.0 == 0 {
                            trace!("SYN resend - ignoring");
                            return Ok(None);
                        }
                        // can be a disordered handshake (receive S after SA)
                        if seq + Wrapping(1) == dst.ian {
                            trace!("Likely received SA before S - ignoring");
                            return Ok(None);
                        }
                    }
                    warn!("Not an ACK");
                }
                // TODO check seq, ack
                if ack != dst.isn + Wrapping(1) {
                    warn!("NEW/ACK: ack number is wrong");
                    return Err(TcpStreamError::HandshakeFailed);
                }
                src.status = TcpStatus::Established;
                dst.status = TcpStatus::Established;
                dst.last_rel_ack = Wrapping(1);
                self.status = TcpStatus::Established;
                // do we have data ?
                if !tcp.payload().is_empty() {
                    // warn!("Data in handshake ACK");
                    let segment = TcpSegment {
                        rel_seq: seq - src.isn,
                        rel_ack: ack - dst.isn,
                        flags: tcp_flags,
                        data: tcp.payload().to_vec(), // XXX data cloned here
                        pcap_index,
                    };
                    queue_segment(src, segment);
                }
            }
            TcpStatus::SynRcv => {
                // we received something while in SYN_RCV state - we should only have sent ACK
                // this could be a SYN+ACK retransmit
                if tcp_flags == TcpFlags::SYN | TcpFlags::ACK {
                    // XXX compare SEQ numbers?
                    // ignore
                    return Ok(None);
                }
                warn!(
                    "Received unexpected data in SYN_RCV state idx={}",
                    pcap_index
                );
            }
            _ => unreachable!(),
        }
        Ok(None)
    }

    pub fn handle_established_connection(
        &mut self,
        tcp: &TcpPacket,
        to_server: bool,
        pcap_index: usize,
    ) -> Result<Option<Vec<TcpSegment>>, TcpStreamError> {
        let (origin, destination) = if to_server {
            (&mut self.client, &mut self.server)
        } else {
            (&mut self.server, &mut self.client)
        };

        let rel_seq = Wrapping(tcp.get_sequence()) - origin.isn;
        let rel_ack = Wrapping(tcp.get_acknowledgement()) - destination.isn;
        let tcp_flags = tcp.get_flags();

        trace!("EST: payload len={}", tcp.payload().len());
        trace!(
            "    Tcp rel seq {} ack {} next seq {}",
            rel_seq,
            rel_ack,
            origin.next_rel_seq
        );

        if tcp_flags & TcpFlags::ACK == 0 && tcp.get_acknowledgement() != 0 {
            warn!(
                "EST/ packet without ACK (broken TCP implementation or attack) idx={}",
                pcap_index
            );
            // ignore segment
            return Ok(None);
        }

        let segment = TcpSegment {
            rel_seq,
            rel_ack,
            flags: tcp_flags,
            data: tcp.payload().to_vec(), // XXX data cloned here
            pcap_index,
        };
        queue_segment(origin, segment);

        // trace!("Destination: {:?}", destination); // TODO to remove

        // if there is a ACK, check & send segments on the *other* side
        let ret = if tcp_flags & TcpFlags::ACK != 0 {
            send_peer_segments(destination, rel_ack)
        } else {
            None
        };

        trace!(
            "    PEER EST rel next seq {} last_ack {}",
            destination.next_rel_seq,
            destination.last_rel_ack,
        );

        Ok(ret)
    }

    fn handle_closing_connection(
        &mut self,
        tcp: &TcpPacket,
        to_server: bool,
        pcap_index: usize,
    ) -> Option<Vec<TcpSegment>> {
        let (mut origin, destination) = if to_server {
            (&mut self.client, &mut self.server)
        } else {
            (&mut self.server, &mut self.client)
        };

        let tcp_flags = tcp.get_flags();
        let rel_seq = Wrapping(tcp.get_sequence()) - origin.isn;
        let rel_ack = Wrapping(tcp.get_acknowledgement()) - destination.isn;
        let has_ack = tcp_flags & TcpFlags::ACK != 0;
        let has_fin = tcp_flags & TcpFlags::FIN != 0;

        let ret = if has_ack {
            trace!("ACKing segments up to {}", rel_ack);
            send_peer_segments(destination, rel_ack)
        } else {
            if tcp.get_acknowledgement() != 0 {
                warn!(
                    "EST/ packet without ACK (broken TCP implementation or attack) idx={}",
                    pcap_index
                );
                // ignore segment
                return None;
            }
            None
        };
        if tcp_flags & TcpFlags::RST != 0 {
            // if we get a RST, check the sequence number and remove matching segments
            // trace!("RST received. rel_seq: {}", rel_seq);
            // trace!(
            //     "{} remaining (undelivered) segments DESTINATION",
            //     destination.segments.len()
            // );
            // for (n, s) in destination.segments.iter().enumerate() {
            //     trace!("  s[{}]: rel_seq={} plen={}", n, s.rel_seq, s.data.len());
            // }
            // remove queued segments up to rel_seq
            destination.segments.retain(|s| s.rel_ack != rel_seq);
            trace!(
                "RST: {} remaining (undelivered) segments DESTINATION after removal",
                destination.segments.len()
            );
            origin.status = TcpStatus::Closed; // XXX except if ACK ?
            return ret;
        }

        // queue segment (even if FIN, to get correct seq numbers)
        let rel_seq = Wrapping(tcp.get_sequence()) - origin.isn;
        let rel_ack = Wrapping(tcp.get_acknowledgement()) - destination.isn;
        let segment = TcpSegment {
            rel_seq,
            rel_ack,
            flags: tcp_flags,
            data: tcp.payload().to_vec(), // XXX data cloned here
            pcap_index,
        };
        queue_segment(origin, segment);

        // if tcp_flags & TcpFlags::FIN != 0 {
        //     warn!("origin next seq was {}", origin.next_rel_seq.0);
        //     origin.next_rel_seq += Wrapping(1);
        // }

        match origin.status {
            TcpStatus::Established => {
                // we know there is a FIN (tested in TcpStreamReassembly::update)
                origin.status = TcpStatus::FinWait1;
                destination.status = TcpStatus::CloseWait; // we are not sure it was received
            }
            TcpStatus::CloseWait => {
                if !has_fin {
                    // if only an ACK, do nothing and stay in CloseWait status
                    if has_ack {
                        // debug!("destination status: {:?}", destination.status);
                        if destination.status == TcpStatus::FinWait1 {
                            destination.status = TcpStatus::FinWait2;
                        }
                    } else {
                        warn!("Origin should have sent a FIN and/or ACK");
                    }
                } else {
                    origin.status = TcpStatus::LastAck;
                    // debug!("destination status: {:?}", destination.status);
                    if has_ack || destination.status == TcpStatus::FinWait2 {
                        destination.status = TcpStatus::TimeWait;
                    } else {
                        destination.status = TcpStatus::Closing;
                    }
                }
            }
            TcpStatus::TimeWait => {
                // only an ACK should be sent (XXX nothing else, maybe PSH)
                if has_ack {
                    // this is the end!
                    origin.status = TcpStatus::Closed;
                    destination.status = TcpStatus::Closed;
                }
            }
            _ => {
                warn!(
                    "Unhandled closing transition: origin host {} status {:?}",
                    origin.addr, origin.status
                );
                warn!(
                    "    dest host {} status {:?}",
                    destination.addr, destination.status
                );
            }
        }

        trace!(
            "TCP connection closing, {} remaining (undelivered) segments",
            origin.segments.len()
        );
        // DEBUG
        for (n, s) in origin.segments.iter().enumerate() {
            trace!(
                "  s[{}]: seq={} len={} idx={}",
                n,
                s.rel_seq.0,
                s.data.len(),
                s.pcap_index,
            );
        }

        // TODO what now?

        if origin.segments.is_empty() {
            return ret;
        }

        ret
    }

    // force expiration (for ex after timeout) of this stream
    fn expire(&mut self) {
        self.client.status = TcpStatus::Closed;
        self.server.status = TcpStatus::Closed;
    }
} // TcpStream

fn queue_segment(peer: &mut TcpPeer, segment: TcpSegment) {
    // only store segments with data, except FIN
    if segment.data.is_empty() && segment.flags & TcpFlags::FIN == 0 {
        return;
    }
    // // DEBUG
    // for (n, s) in peer.segments.iter().enumerate() {
    //     debug!(
    //         "  XXX peer s[{}]: rel_seq={} plen={}",
    //         n,
    //         s.rel_seq,
    //         s.data.len()
    //     );
    // }
    // trivial case: list is empty - just push segment
    if peer.segments.is_empty() {
        trace!("Pushing segment (front)");
        peer.segments.push_front(segment);
        return;
    }

    if EARLY_DETECT_OVERLAP {
        // find last element before candidate and first element after candidate
        let mut before = None;
        let mut after = None;
        // let mut opt_pos = None;
        for (_n, s) in peer.segments.iter().enumerate() {
            if s.rel_seq < segment.rel_seq {
                before = Some(s);
            } else {
                after = Some(s);
                // opt_pos = Some(n);
                break;
            }
        }
        // trace!("tcp segment insertion index: {:?}", opt_pos);
        // check for left overlap
        if let Some(s) = before {
            let next_seq = s.rel_seq + Wrapping(s.data.len() as u32);
            match segment.rel_seq.cmp(&next_seq) {
                Ordering::Equal => {
                    // XXX do nothing, simply queue segment
                    // // simple case: merge segment
                    // trace!(
                    //     "Merging segments (seq {} and {})",
                    //     s.rel_seq,
                    //     segment.rel_seq
                    // );
                    // s.data.extend_from_slice(&segment.data);
                    // s.rel_ack = segment.rel_ack;
                    // // XXX pcap_index should be a list (and append to it)
                    // // TODO check next segment in queue to test if a hole was filled
                    // return;
                }
                Ordering::Greater => {
                    // we have a hole
                    warn!("Missing segment on left of incoming segment");
                }
                Ordering::Less => {
                    // Left overlap
                    warn!("Segment with left overlap");
                    // let overlap_size = (next_seq - segment.rel_seq).0 as usize;
                    // debug_assert!(overlap_size <= s.data.len());
                    // let overlap_start = s.data.len() - overlap_size;
                    // let overlap_left = &s.data[overlap_start..];
                    // if overlap_left == &segment.data[..overlap_size] {
                    //     info!(
                    //         "TCP Segment with left overlap: area matches idx={}",
                    //         segment.pcap_index
                    //     );
                    //     trace!("Left overlap: removing {} bytes", overlap_size);
                    //     // remove overlapping area and fix offset
                    //     let new_data = segment.data.split_off(overlap_size);
                    //     segment.data = new_data;
                    //     segment.rel_seq += Wrapping(overlap_size as u32);
                    // } else {
                    //     warn!(
                    //         "TCP Segment with left overlap: area differs idx={}",
                    //         segment.pcap_index
                    //     );
                    //     // XXX keep new ?
                    // }
                }
            }
        }
        // check for right overlap
        if let Some(s) = after {
            let right_next_seq = segment.rel_seq + Wrapping(segment.data.len() as u32);
            match right_next_seq.cmp(&s.rel_seq) {
                Ordering::Equal => (),
                Ordering::Greater => {
                    // Right overlap
                    warn!("Segment with right overlap");
                    // let overlap_size = (right_next_seq - s.rel_seq).0 as usize;
                    // debug_assert!(overlap_size <= s.data.len());
                    // let overlap_start = segment.data.len() - overlap_size;
                    // let overlap = &segment.data[overlap_start..];
                    // let right_overlap = &s.data[..overlap_size];
                    // if overlap == right_overlap {
                    //     info!(
                    //         "TCP Segment with right overlap: area matches idx={}",
                    //         segment.pcap_index
                    //     );
                    //     trace!("Right overlap: removing {} bytes", overlap_size);
                    //     segment.data.truncate(overlap_start);
                    // } else {
                    //     warn!(
                    //         "TCP Segment with right overlap: area differs idx={}",
                    //         segment.pcap_index
                    //     );
                    //     // XXX keep new ?
                    // }
                }
                Ordering::Less => {
                    trace!(
                        "hole remaining on right of incoming segment idx={}",
                        segment.pcap_index
                    );
                }
            }
        }
        // if segment.data.is_empty() && segment.flags & TcpFlags::FIN == 0 {
        //     trace!("No data after overlap, NOT queuing segment");
        //     return;
        // }
    }
    trace!("Adding segment");
    peer.insert_sorted(segment);
}

fn send_peer_segments(peer: &mut TcpPeer, rel_ack: Wrapping<u32>) -> Option<Vec<TcpSegment>> {
    trace!(
        "Trying to send segments for {}:{} up to {} (last ack: {})",
        peer.addr,
        peer.port,
        rel_ack,
        peer.last_rel_ack
    );
    if rel_ack == peer.last_rel_ack {
        trace!("re-acking last data, doing nothing");
        return None;
    }
    if peer.segments.is_empty() {
        return None;
    }

    // is ACK acceptable?
    if rel_ack < peer.last_rel_ack {
        warn!("ACK request for already ACKed data (ack < last_ack)");
        return None;
    }

    // check consistency of segment ACK numbers + order and/or missing fragments and/or overlap

    let mut acked = Vec::new();

    while !peer.segments.is_empty() {
        let segment = &peer.segments[0];
        trace!(
            "segment: rel_seq={}  len={}",
            segment.rel_seq,
            segment.data.len()
        );
        trace!(
            "  origin.next_rel_seq {} ack {}",
            peer.next_rel_seq,
            rel_ack
        );
        // if origin.next_rel_seq > rel_ack {
        //     warn!("next_seq > ack - partial ACK ?");
        //     unreachable!(); // XXX do we care about that case?
        //                     // break;
        // }
        if rel_ack <= segment.rel_seq {
            // if packet is in the past (strictly less), we don't care
            break;
        }

        // safety: segments is just tested above
        let mut segment = peer.segments.pop_front().unwrap();

        if rel_ack < segment.rel_seq + Wrapping(segment.data.len() as u32) {
            // warn!("ACK lower then seq + segment size - SACK?");
            trace!("ACK for part of buffer");
            // split data and insert new dummy segment
            trace!("rel_ack {} segment.rel_seq {}", rel_ack, segment.rel_seq);
            trace!("segment data len {}", segment.data.len());
            let acked_len = (rel_ack - segment.rel_seq).0 as usize;
            let new_segment = segment.split_off(acked_len);
            trace!(
                "insert new segment from {} len {}",
                new_segment.rel_ack,
                new_segment.data.len()
            );
            peer.insert_sorted(new_segment);
        }

        handle_overlap_linux(peer, &mut segment);
        adjust_seq_numbers(peer, &segment);

        trace!(
            "ACKed: pushing segment: rel_seq={} len={}",
            segment.rel_seq,
            segment.data.len(),
        );
        if !segment.data.is_empty() {
            acked.push(segment);
        }
    }

    if peer.next_rel_seq != rel_ack {
        // missed segments, or maybe received FIN ?
        warn!(
            "TCP ACKed unseen segment next_seq {} != ack {} (Missed segments?)",
            peer.next_rel_seq, rel_ack
        );
        // TODO notify upper layer for missing data
    }

    peer.last_rel_ack = rel_ack;
    Some(acked)
}

const FIRST_WINS: bool = false;

// implements the "first segment wins" or the "last segment wins" policies
#[allow(dead_code)]
fn handle_overlap_first_last(peer: &mut TcpPeer, segment: &mut TcpSegment) {
    // loop while segment has overlap
    while let Some(next) = peer.segments.front() {
        if let Some(overlap_offset) = segment.overlap_offset(next) {
            let next_pcap_index = next.pcap_index;
            warn!(
                "segments overlaps next candidate (offset={})",
                overlap_offset
            );
            trace!("segment idx={}", segment.pcap_index);
            // split segment at overlapping_offset
            let mut segment_right = segment.split_off(overlap_offset);
            let overlap_size;
            // segment right can be greater, equal or smaller to next
            match segment_right.data.len().cmp(&next.data.len()) {
                Ordering::Less => {
                    // right_segment is smaller than next
                    overlap_size = segment_right.data.len();
                    if segment_right.data[..] != next.data[..overlap_size] {
                        warn!(
                            "TCP overlapping data differ in packets idx={} and idx={}",
                            segment_right.pcap_index, next_pcap_index
                        );
                    }
                    let first = peer.segments.front_mut().unwrap();
                    let front_right = first.split_off(overlap_size);
                    trace!("front_right idx={}", front_right.pcap_index);
                    trace!("re-inserting remaining data (next)");
                    peer.insert_sorted(front_right);
                }
                Ordering::Equal => {
                    if segment_right.data[..] != next.data[..] {
                        warn!(
                            "TCP overlapping data differ in packets idx={} and idx={}",
                            segment_right.pcap_index, next_pcap_index
                        );
                    }
                }
                Ordering::Greater => {
                    // right_segment is longer than next
                    overlap_size = next.data.len();
                    if segment_right.data[..overlap_size] != next.data[..] {
                        warn!(
                            "TCP overlapping data differ in packets idx={} and idx={}",
                            segment_right.pcap_index, next_pcap_index
                        );
                    }
                    let rem = segment_right.split_off(overlap_size);
                    trace!("re-inserting remaining data (first)");
                    peer.insert_sorted(rem);
                }
            }
            // which part to keep ? segment_right or next ?
            // trace!("FIRST_WINS: {}, l:{} r:{}", FIRST_WINS, segment.pcap_index, next_pcap_index);
            // trace!("(before)\n{:?}", peer);
            if FIRST_WINS ^ (segment.pcap_index > next_pcap_index) {
                trace!("dropping next");
                let _ = peer.segments.pop_front();
                peer.insert_sorted(segment_right);
            } else {
                trace!("dropping first");
                drop(segment_right);
            }
        // trace!("(after)\n{:?}", peer);
        } else {
            break;
        }
    }
}

// handle overlapping segments, using a linux-like policy
// Linux favors an original segment, EXCEPT when the subsequent begins before the original,
//or the subsequent segment begins the same and ends after the original segment.
#[allow(dead_code)]
fn handle_overlap_linux(peer: &mut TcpPeer, segment: &mut TcpSegment) {
    // loop while segment has overlap
    while let Some(next) = peer.segments.front() {
        if let Some(overlap_offset) = segment.overlap_offset(next) {
            warn!(
                "segment idx={} overlaps next candidate idx={} (at offset={})",
                segment.pcap_index, next.pcap_index, overlap_offset
            );
            // we will modify the subsequent segment (next)
            // safety: element presence was tested in outer loop
            let next = peer.segments.pop_front().unwrap();

            // split next
            let overlap_size = segment.data.len() - overlap_offset;
            let min_overlap_size = std::cmp::min(overlap_size, next.data.len());
            // compare overlap area
            if next.data[..min_overlap_size]
                != segment.data[overlap_offset..overlap_offset + min_overlap_size]
            {
                warn!(
                    "Overlap area differs! left idx={} right idx={}",
                    segment.pcap_index, next.pcap_index
                );
            }
            if overlap_size >= next.data.len() {
                // subsequent segment starts after and is smaller, so drop it
                drop(next);
                continue;
            }
            // otherwise, split next into left and right, drop left and accept right
            let mut left = next;
            let right = left.split_off(overlap_size);
            // to accept right, merge it into segment
            segment.data.extend_from_slice(&right.data);
        } else {
            // trace!("no overlap, break");
            break;
        }
    }
}

fn adjust_seq_numbers(origin: &mut TcpPeer, segment: &TcpSegment) {
    if !segment.data.is_empty() {
        // adding length is wrong in case of overlap
        // origin.next_rel_seq += Wrapping(segment.data.len() as u32);
        origin.next_rel_seq = segment.rel_seq + Wrapping(segment.data.len() as u32);
    }

    if segment.flags & TcpFlags::FIN != 0 {
        // trace!("Segment has FIN");
        origin.next_rel_seq += Wrapping(1);
    }
}

impl TcpStreamReassembly {
    pub(crate) fn update(
        &mut self,
        flow: &Flow,
        tcp: &TcpPacket,
        to_server: bool,
        pcap_index: usize,
    ) -> Result<Option<Vec<TcpSegment>>, TcpStreamError> {
        trace!("5-t: {}", flow.five_tuple);
        trace!("  flow id: {:x}", flow.flow_id);
        trace!(
            "  seq: {:x}  ack {:x}",
            tcp.get_sequence(),
            tcp.get_acknowledgement()
        );

        let mut stream = self
            .m
            .entry(flow.flow_id)
            .or_insert_with(|| TcpStream::new(flow));
        trace!("stream state: {:?}", stream.status);
        trace!("to_server: {}", to_server);

        // check time delay with previous packet before updating
        if stream.last_seen_ts > flow.last_seen {
            info!("packet received in past of stream idx={}", pcap_index);
        } else if flow.last_seen - stream.last_seen_ts > self.timeout {
            warn!("TCP stream received packet after timeout");
            stream.expire();
            return Err(TcpStreamError::Expired);
        }
        stream.last_seen_ts = flow.last_seen;

        let (origin, _destination) = if to_server {
            (&stream.client, &stream.server)
        } else {
            (&stream.server, &stream.client)
        };

        trace!(
            "origin: {}:{} status {:?}",
            origin.addr,
            origin.port,
            origin.status
        );
        debug_print_tcp_flags(tcp.get_flags());

        match origin.status {
            TcpStatus::Closed | TcpStatus::Listen | TcpStatus::SynSent | TcpStatus::SynRcv => {
                stream.handle_new_connection(tcp, to_server, pcap_index)
            }
            TcpStatus::Established => {
                // check for close request
                if tcp.get_flags() & (TcpFlags::FIN | TcpFlags::RST) != 0 {
                    trace!("Requesting end of connection");
                    Ok(stream.handle_closing_connection(tcp, to_server, pcap_index))
                } else {
                    stream.handle_established_connection(tcp, to_server, pcap_index)
                }
            }
            _ => Ok(stream.handle_closing_connection(tcp, to_server, pcap_index)),
        }
    }
    pub(crate) fn check_expired_connections(&mut self, now: Duration) {
        for (flow_id, stream) in self.m.iter_mut() {
            if now < stream.last_seen_ts {
                warn!(
                    "stream.last_seen_ts is in the future for flow id {:x}",
                    flow_id
                );
                continue;
            }
            if now - stream.last_seen_ts > self.timeout {
                warn!("TCP stream timeout reached for flow {:x}", flow_id);
                stream.expire();
            }
        }
    }
}

pub(crate) fn finalize_tcp_streams(analyzer: &mut crate::analyzer::Analyzer) {
    warn!("expiring all TCP connections");
    for (flow_id, _stream) in analyzer.tcp_defrag.m.iter() {
        // TODO do we have anything to do?
        if let Some(flow) = analyzer.flows.get_flow(*flow_id) {
            debug!("  flow: {:?}", flow);
        }
    }
    analyzer.tcp_defrag.m.clear();
}

fn debug_print_tcp_flags(tcp_flags: u16) {
    if log::Level::Debug <= log::STATIC_MAX_LEVEL {
        let mut s = String::from("tcp_flags: [");
        if tcp_flags & TcpFlags::SYN != 0 {
            s += "S"
        }
        if tcp_flags & TcpFlags::FIN != 0 {
            s += "F"
        }
        if tcp_flags & TcpFlags::RST != 0 {
            s += "R"
        }
        if tcp_flags & TcpFlags::URG != 0 {
            s += "U"
        }
        if tcp_flags & TcpFlags::PSH != 0 {
            s += "P"
        }
        if tcp_flags & TcpFlags::ACK != 0 {
            s += "A"
        }
        s += "]";
        trace!("{}", s);
    }
}

impl fmt::Debug for TcpPeer {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        writeln!(f, "Peer: {}:{}", self.addr, self.port)?;
        writeln!(f, "  status: {:?}", self.status)?;
        writeln!(f, "  isn: 0x{:x}  ian: 0x{:x}", self.isn, self.ian)?;
        writeln!(f, "  next_rel_seq: {}", self.next_rel_seq)?;
        writeln!(f, "  last_rel_ack: {}", self.last_rel_ack)?;
        writeln!(f, "  #segments: {}", self.segments.len())?;
        for (n, s) in self.segments.iter().enumerate() {
            writeln!(
                f,
                "    s[{}]: rel_seq={} len={} idx={}",
                n,
                s.rel_seq,
                s.data.len(),
                s.pcap_index,
            )?;
        }
        Ok(())
    }
}