nsave 0.1.0

capturing and saving packets
Documentation
use crate::common::*;
use crate::configure::*;
use crate::mmapbuf::*;
use crate::packet::*;
use crate::search_ti::*;
use bincode::deserialize_from;
use chrono::{Duration, NaiveDateTime};
use serde::{Deserialize, Serialize};
use std::borrow::Borrow;
use std::collections::HashSet;
use std::fmt;
use std::hash::{Hash, Hasher};
use std::net::IpAddr;
use std::{
    fs::{File, OpenOptions},
    path::{Path, PathBuf},
};

#[derive(Debug)]
pub struct TimeIndex {
    configure: &'static Configure,
    buf_writer: Option<MmapBufWriter>,
}

impl TimeIndex {
    pub fn new(configure: &'static Configure) -> Self {
        TimeIndex {
            configure,
            buf_writer: None,
        }
    }

    pub fn init_dir(&mut self, dir: &Path) -> Result<(), StoreError> {
        let mut path = PathBuf::new();
        path.push(dir);
        path.push("timeindex.ti");
        let result = OpenOptions::new()
            .read(true)
            .write(true)
            .create(true)
            .truncate(true)
            .open(path);
        match result {
            Ok(fd) => {
                let meta = fd.metadata()?;
                self.buf_writer = Some(MmapBufWriter::with_arg(
                    fd,
                    meta.len(),
                    self.configure.ti_buff_size,
                ));
            }
            Err(e) => return Err(StoreError::IoError(e)),
        }
        Ok(())
    }

    pub fn change_dir(&mut self) -> Result<(), StoreError> {
        self.buf_writer = None;
        Ok(())
    }

    pub fn write(&mut self, record: LinkRecord) -> Result<u64, StoreError> {
        let ci_offset = self.buf_writer.borrow().as_ref().unwrap().next_offset();
        if let Some(ref mut writer) = self.buf_writer {
            if bincode::serialize_into(writer, &record).is_err() {
                return Err(StoreError::WriteError("time index write error".to_string()));
            }
        }
        Ok(ci_offset)
    }

    pub fn finish(&mut self) {
        self.buf_writer = None;
    }
}

#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
pub struct LinkRecord {
    pub start_time: u128,
    pub end_time: u128,
    pub tuple5: PacketKey,
    pub ci_offset: u64,
}

impl PartialEq for LinkRecord {
    fn eq(&self, other: &Self) -> bool {
        self.start_time == other.start_time && self.tuple5 == other.tuple5
    }
}

impl Eq for LinkRecord {}

impl Hash for LinkRecord {
    fn hash<H: Hasher>(&self, state: &mut H) {
        self.start_time.hash(state);
        self.tuple5.hash(state);
    }
}

impl fmt::Display for LinkRecord {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(
            f,
            "LinkRecord {{ start_time: {} ({}), end_time: {} ({}), tuple5: {:?}, ci_offset: {} }}",
            ts_date(self.start_time),
            self.start_time,
            ts_date(self.end_time),
            self.end_time,
            self.tuple5,
            self.ci_offset
        )
    }
}

#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)]
pub struct SearchKey {
    pub start_time: Option<NaiveDateTime>,
    pub end_time: Option<NaiveDateTime>,
    pub sip: Option<IpAddr>,
    pub dip: Option<IpAddr>,
    pub sport: Option<u16>,
    pub dport: Option<u16>,
    pub protocol: Option<TransProto>,
}

impl fmt::Display for SearchKey {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(
            f,
            "SearchKey {{ start_time: {:?} ({:?}), end_time: {:?} ({:?}), sport: {:?}, dport: {:?}, protocol: {:?} }}",
            self.start_time,
            date_ts(self.start_time),
            self.end_time,
            date_ts(self.end_time),
            self.sport,
            self.dport,
            self.protocol
        )
    }
}

pub fn dump_timeindex_file(path: PathBuf) -> Result<(), StoreError> {
    match path.extension() {
        Some(ext) => {
            if !ext.to_str().unwrap().eq("ti") {
                return Err(StoreError::CliError("not timeindex file".to_string()));
            }
        }
        None => return Err(StoreError::CliError("not timeindex file".to_string())),
    };

    let file = match OpenOptions::new()
        .read(true)
        .write(true)
        .create(false)
        .truncate(false)
        .open(&path)
    {
        Ok(fd) => fd,
        Err(e) => {
            return Err(StoreError::CliError(format!("open file error: {}", e)));
        }
    };
    let mut mmap_reader = MmapBufReader::new(file);
    println!("dump timeindex file: {:?}", path);
    while let Ok(record) = deserialize_from::<_, LinkRecord>(&mut mmap_reader) {
        println!("{}", record);
    }
    Ok(())
}

pub fn ti_search(
    configure: &'static Configure,
    search_key: SearchKey,
    dir_id: u64,
) -> Vec<LinkRecord> {
    let mut ti_set = HashSet::new();
    let mut search_date = search_key.start_time.unwrap();
    while search_date < search_key.end_time.unwrap() {
        if let Ok((ti_file, _ti_file_path)) = date2ti_file(configure, search_date, dir_id) {
            ti_set = search_ti(search_key, ti_file)
                .into_iter()
                .collect::<HashSet<_>>();
        }
        search_date += Duration::try_minutes(1).unwrap();
    }
    ti_set.into_iter().collect()
}

fn search_ti(search_key: SearchKey, ti_file: File) -> Vec<LinkRecord> {
    let mut record: Vec<LinkRecord> = Vec::new();
    let mut reader = MmapBufReader::new(ti_file);
    while let Ok(ti) = deserialize_from::<_, LinkRecord>(&mut reader) {
        if match_ti(search_key, &ti) {
            record.push(ti);
        }
    }
    record
}

fn match_ti(search_key: SearchKey, record: &LinkRecord) -> bool {
    let SearchKey {
        start_time,
        end_time,
        sip,
        dip,
        sport,
        dport,
        protocol,
    } = search_key;
    let stime = date_ts(start_time);
    let etime = date_ts(end_time);

    let match0 = match_stime(record.start_time, stime)
        && match_etime(record.end_time, etime)
        && match_protocol(record.tuple5.trans_proto, protocol);
    let match1 = match_ip(record.tuple5.addr1, sip)
        && match_port(record.tuple5.port1, sport)
        && match_ip(record.tuple5.addr2, dip)
        && match_port(record.tuple5.port2, dport);
    let match2 = match_ip(record.tuple5.addr1, dip)
        && match_port(record.tuple5.port1, dport)
        && match_ip(record.tuple5.addr2, sip)
        && match_port(record.tuple5.port2, sport);
    match0 && (match1 || match2)
}

fn match_stime(rd_start: u128, op_start: Option<u128>) -> bool {
    if let Some(ts) = op_start {
        ts <= rd_start
    } else {
        true
    }
}

fn match_etime(rd_end: u128, op_end: Option<u128>) -> bool {
    if let Some(ts) = op_end {
        ts >= rd_end
    } else {
        true
    }
}

fn match_ip(rd_ip: IpAddr, op_ip: Option<IpAddr>) -> bool {
    if let Some(ip) = op_ip {
        ip == rd_ip
    } else {
        true
    }
}

fn match_protocol(rd_proto: TransProto, op_proto: Option<TransProto>) -> bool {
    if let Some(proto) = op_proto {
        proto == rd_proto
    } else {
        true
    }
}

fn match_port(rd_port: u16, op_port: Option<u16>) -> bool {
    if let Some(port) = op_port {
        port == rd_port
    } else {
        true
    }
}

pub fn search_lr(dir: &Path, tuple5: PacketKey) -> Option<LinkRecord> {
    let mut path = PathBuf::new();
    path.push(dir);
    path.push("timeindex.ti");
    match OpenOptions::new()
        .read(true)
        .write(true)
        .create(false)
        .truncate(false)
        .open(&path)
    {
        Ok(ti_file) => {
            let mut reader = MmapBufReader::new(ti_file);
            while let Ok(ti) = deserialize_from::<_, LinkRecord>(&mut reader) {
                if tuple5 == ti.tuple5 {
                    return Some(ti);
                }
            }
            None
        }
        Err(_e) => None,
    }
}