protolens 0.2.1

TCP stream reassembly,application layer protocol parsing
Documentation
#![allow(unused)]
use crate::capture::CapPacket;
use crate::capture::PktHeader;
use crate::recognize::{Direction, ProtoID, recognize_pkt};
use etherparse::{IpHeader, PacketHeaders, TransportHeader};
use protolens::L7Proto as ProlensL7Proto;
use protolens::Prolens;
use protolens::Task;
use protolens::TransProto as ProlensTransProto;
use std::cmp::Ordering;
use std::ffi::c_void;
use std::sync::Mutex;
use std::{
    borrow::Borrow,
    cell::{Ref, RefCell, RefMut},
    net::IpAddr,
    ops::Deref,
    rc::Rc,
};
use tmohash::TmoHash;

const MAX_TABLE_CAPACITY: usize = 128;
const NODE_TIMEOUT: u128 = 10000;

pub struct Flow {
    table: RefCell<TmoHash<FlowKey, FlowNode>>,
}

impl Flow {
    pub fn new() -> Self {
        Flow {
            table: RefCell::new(TmoHash::new(MAX_TABLE_CAPACITY)),
        }
    }

    pub fn insert(&self, pkt: &CapPacket, now: u128) {
        if let Some(key) = make_key(pkt.header.borrow().as_ref().unwrap()) {
            let mut table = self.table.borrow_mut();
            if !table.contains_key(&key) {
                table.insert(key, FlowNode::new(&key, now));
            }
        }
    }

    pub fn get(&self, pkt: &CapPacket) -> Option<Ref<FlowNode>> {
        make_key(pkt.header.borrow().as_ref().unwrap())
            .map(|key| Ref::map(self.table.borrow(), |tbl| tbl.get(&key).unwrap()))
    }

    pub fn get_mut(&self, pkt: &CapPacket) -> Option<RefMut<FlowNode>> {
        make_key(pkt.header.borrow().as_ref().unwrap())
            .map(|key| RefMut::map(self.table.borrow_mut(), |tbl| tbl.get_mut(&key).unwrap()))
    }

    pub fn process_pkt(
        &self,
        pkt: &CapPacket,
        now: u128,
        prolens: &mut Prolens<CapPacket>,
    ) -> Option<RefMut<FlowNode>> {
        if let Some(mut node) = self.get_mut_node(pkt, now) {
            // [插入点] 数据包处理
            recognize_pkt(pkt, &mut node); // 协议识别
            node.parse(pkt.clone(), prolens);

            if node.both_fin(pkt.header.borrow().as_ref().unwrap()) {
                // [插入点] 流结束
                self.table.borrow_mut().remove(&node.key);
            }
            Some(node)
        } else {
            None
        }
    }

    pub fn timer(&self, now: u128) {
        // [插入点] 定时器
        // flow 本身的定时器
        self.timeout(now);
    }

    pub fn clear(&self) {}

    fn get_node(&self, pkt: &CapPacket, now: u128) -> Option<Ref<FlowNode>> {
        let key = make_key(pkt.header.borrow().as_ref().unwrap())?;
        if !self.table.borrow().contains_key(&key) {
            self.table
                .borrow_mut()
                .insert(key, FlowNode::new(&key, now));
        }
        self.get(pkt)
    }

    fn get_mut_node(&self, pkt: &CapPacket, now: u128) -> Option<RefMut<FlowNode>> {
        let key = make_key(pkt.header.borrow().as_ref().unwrap())?;
        if !self.table.borrow().contains_key(&key) {
            self.table
                .borrow_mut()
                .insert(key, FlowNode::new(&key, now));
        }
        self.get_mut(pkt)
    }

    // 流节点超时
    fn timeout(&self, now: u128) {
        self.table.borrow_mut().timeout(|_key, node| {
            if now - node.borrow().last_time >= NODE_TIMEOUT {
                // [插入点] 超时流结点处理
                // match node.pkt_dir {
                //     Direction::Client => node.stream.0.timeout(),
                //     Direction::Server => node.stream.1.timeout(),
                //     _ => ()
                // }
                true
            } else {
                false
            }
        })
    }
}

impl Drop for Flow {
    fn drop(&mut self) {}
}

fn make_key(header: &PktHeader) -> Option<FlowKey> {
    if let Some(key) = pkt_key(header) {
        match key.addr1.cmp(&key.addr2) {
            Ordering::Greater => Some(key),
            Ordering::Less => swap_key(key),
            Ordering::Equal => cmp_port(key),
        }
    } else {
        None
    }
}

fn swap_key(key: FlowKey) -> Option<FlowKey> {
    Some(FlowKey {
        addr1: key.addr2,
        port1: key.port2,
        addr2: key.addr1,
        port2: key.port1,
        ..key
    })
}

fn cmp_port(key: FlowKey) -> Option<FlowKey> {
    match key.port1.cmp(&key.port2) {
        Ordering::Greater | Ordering::Equal => Some(key),
        Ordering::Less => swap_key(key),
    }
}

fn pkt_key(header: &PktHeader) -> Option<FlowKey> {
    match &header.ip {
        Some(IpHeader::Version4(ipv4h, _)) => Some(FlowKey {
            addr1: Some(ipv4h.source.into()),
            port1: header.sport(),
            addr2: Some(ipv4h.destination.into()),
            port2: header.dport(),
            trans_proto: trans_proto(header),
        }),
        Some(IpHeader::Version6(ipv6h, _)) => Some(FlowKey {
            addr1: Some(ipv6h.source.into()),
            port1: header.sport(),
            addr2: Some(ipv6h.destination.into()),
            port2: header.dport(),
            trans_proto: trans_proto(header),
        }),
        None => None,
    }
}

fn trans_proto(header: &PktHeader) -> TransProto {
    match &header.transport {
        Some(TransportHeader::Udp(_)) => TransProto::Udp,
        Some(TransportHeader::Tcp(_)) => TransProto::Tcp,
        Some(TransportHeader::Icmpv4(_)) => TransProto::Icmp4,
        Some(TransportHeader::Icmpv6(_)) => TransProto::Icmp6,
        None => TransProto::Unknown,
    }
}

#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)]
pub struct FlowKey {
    pub addr1: Option<IpAddr>,
    pub port1: u16,
    pub addr2: Option<IpAddr>,
    pub port2: u16,
    trans_proto: TransProto,
}

#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)]
enum TransProto {
    Udp,
    Tcp,
    Icmp4,
    Icmp6,
    Unknown,
}

#[derive(Debug, Clone, Copy)]
pub enum KeyDir {
    Addr1Client,
    Addr2Client,
    Unknown,
}

#[derive(Debug, Eq, PartialEq, Clone, Copy)]
enum StreamState {
    Fin,
    Unknown,
}

#[derive(Debug)]
pub struct FlowNode {
    pub key: FlowKey,
    pub key_dir: KeyDir,
    pub last_time: u128,
    pub proto_id: ProtoID,
    pub pkt_dir: Direction,
    client_stat: StreamState,
    server_stat: StreamState,

    parser_task: Option<Task<CapPacket>>,
    // 解码出来的元数据
    user: Rc<RefCell<Vec<u8>>>,
    pass: Rc<RefCell<Vec<u8>>>,
}

impl FlowNode {
    fn new(node_key: &FlowKey, now: u128) -> Self {
        FlowNode {
            key: *node_key,
            key_dir: KeyDir::Unknown,
            last_time: now,
            proto_id: ProtoID::Unknown,
            pkt_dir: Direction::Unknown,
            client_stat: StreamState::Unknown,
            server_stat: StreamState::Unknown,

            parser_task: None,
            user: Rc::new(RefCell::new(Vec::<u8>::new())),
            pass: Rc::new(RefCell::new(Vec::<u8>::new())),
        }
    }

    fn client_ip(&self) -> Option<IpAddr> {
        match self.key_dir {
            KeyDir::Addr1Client => self.key.addr1,
            KeyDir::Addr2Client => self.key.addr2,
            KeyDir::Unknown => None,
        }
    }

    fn client_port(&self) -> u16 {
        match self.key_dir {
            KeyDir::Addr1Client => self.key.port1,
            KeyDir::Addr2Client => self.key.port2,
            KeyDir::Unknown => 0,
        }
    }

    fn server_ip(&self) -> Option<IpAddr> {
        match self.key_dir {
            KeyDir::Addr1Client => self.key.addr2,
            KeyDir::Addr2Client => self.key.addr1,
            KeyDir::Unknown => None,
        }
    }

    fn server_port(&self) -> u16 {
        match self.key_dir {
            KeyDir::Addr1Client => self.key.port2,
            KeyDir::Addr2Client => self.key.port2,
            KeyDir::Unknown => 0,
        }
    }

    fn both_fin(&mut self, header: &PktHeader) -> bool {
        if let Some(TransportHeader::Tcp(tcph)) = &header.transport {
            if !tcph.fin && !tcph.rst {
                return false;
            }

            match &header.ip {
                Some(IpHeader::Version4(ipv4h, _)) => {
                    if self.client_ip().unwrap() == Into::<IpAddr>::into(ipv4h.source)
                        && self.client_port() == tcph.source_port
                    {
                        self.client_stat = StreamState::Fin;
                    } else {
                        self.server_stat = StreamState::Fin;
                    }
                }
                Some(IpHeader::Version6(ipv6h, _)) => {
                    if self.client_ip().unwrap() == Into::<IpAddr>::into(ipv6h.source)
                        && self.client_port() == tcph.source_port
                    {
                        self.client_stat = StreamState::Fin;
                    } else {
                        self.server_stat = StreamState::Fin;
                    }
                }
                None => {
                    return false;
                }
            }

            self.client_stat == StreamState::Fin && self.server_stat == StreamState::Fin
        } else {
            false
        }
    }

    fn parse(&mut self, pkt: CapPacket, prolens: &mut Prolens<CapPacket>) {
        self.init_parser_task(prolens);

        if let Some(ref mut task) = self.parser_task {
            prolens.run_task(task, pkt);

            let user_data = self.user.as_ref().borrow();
            if !user_data.is_empty() {
                println!(
                    "self node hold user: {}",
                    String::from_utf8_lossy(&user_data)
                );
            }

            let pass_data = self.pass.as_ref().borrow();
            if !pass_data.is_empty() {
                println!(
                    "self node hold pass: {}",
                    String::from_utf8_lossy(&pass_data)
                );
            }
        }
    }

    fn init_parser_task(&mut self, prolens: &mut Prolens<CapPacket>) {
        if self.parser_task.is_some() {
            return;
        }

        // 设置用户名回调
        let mut user_data = self.user.clone();
        let user_callback = move |user: &[u8], seq: u32, _cb_ctx: *mut c_void| {
            let mut user_guard = user_data.borrow_mut(); // 捕获flownode上下文
            *user_guard = user.to_vec();
            println!(
                "in callback: get user: {}, seq: {}",
                std::str::from_utf8(user).unwrap(),
                seq
            );
        };

        // 设置密码回调
        let mut pass_data = self.pass.clone();
        let pass_callback = move |pass: &[u8], seq: u32, _cb_ctx: *mut c_void| {
            let mut pass_guard = pass_data.borrow_mut(); // 捕获flownode上下文
            *pass_guard = pass.to_vec();
            println!(
                "in callback: get pass: {}, seq: {}",
                std::str::from_utf8(pass).unwrap(),
                seq
            );
        };

        prolens.set_cb_smtp_user(user_callback);
        prolens.set_cb_smtp_pass(pass_callback);

        let mut task = prolens.new_task(ProlensTransProto::Tcp);
        prolens.set_task_parser(&mut task, ProlensL7Proto::Smtp);

        self.parser_task = Some(task);
    }
}