nsave 0.1.0

capturing and saving packets
Documentation
use capture::*;
use clap::{arg, value_parser, Command};
use common::*;
use configure::*;
use daemonize::Daemonize;
use flow::*;
use libnsave::*;
use packet::Packet;
use std::fs::File;
use std::{
    path::PathBuf,
    sync::{
        atomic::{AtomicBool, Ordering},
        mpsc::{self, Receiver, SyncSender, TryRecvError, TrySendError},
        Arc, Barrier,
    },
    thread,
    time::Duration,
};
use store::*;

fn main() {
    let matches = cli().get_matches();

    let conf_file = if let Some(cli_conf) = matches.get_one::<PathBuf>("config") {
        cli_conf.clone()
    } else {
        let home_dir = std::env::var("HOME").expect("Failed to get HOME environment variable");
        let mut path = PathBuf::from(home_dir);
        path.push(DEFAULT_CONFIG_FILE);
        path
    };
    let configure = if let Ok(conf) = Configure::load(&conf_file) {
        conf
    } else {
        println!("need set configure file");
        return;
    };

    if configure.daemon {
        let mut std_out_path = PathBuf::from(configure.store_path.clone());
        std_out_path.push("nsave.out");
        let stdout = File::create(std_out_path).unwrap();
        let mut std_err_path = PathBuf::from(configure.store_path.clone());
        std_err_path.push("nsave.err");
        let stderr = File::create(std_err_path).unwrap();
        let mut pid_file = PathBuf::from(configure.store_path.clone());
        pid_file.push("nsave.pid");
        let work_dir = PathBuf::from(configure.store_path.clone());

        Daemonize::new()
            .pid_file(pid_file)
            .working_directory(work_dir)
            .stdout(stdout)
            .stderr(stderr)
            .start()
            .expect("Failed to start daemon");
    }

    let running = Arc::new(AtomicBool::new(true));
    let r = running.clone();
    ctrlc::set_handler(move || {
        r.store(false, Ordering::Relaxed);
    })
    .expect("Error setting Ctrl-C handler");

    let barrier = Arc::new(Barrier::new((configure.thread_num + 2) as usize));
    let mut statis = (0..configure.thread_num)
        .map(|_| Statis::new())
        .collect::<Vec<Statis>>();
    let mut msg_rxs = vec![];
    let mut pkt_txs = vec![];
    let mut writer_thds = vec![];

    for i in 0..configure.thread_num {
        let barrier_writer = barrier.clone();
        let running_writer = running.clone();
        let (msg_tx, msg_rx) = mpsc::sync_channel::<Msg>(configure.msg_channel_size);
        let (pkt_tx, pkt_rx) = mpsc::sync_channel::<Arc<Packet>>(configure.pkt_channel_size);
        let writer_thd = thread::spawn(move || {
            writer_thread(configure, barrier_writer, running_writer, i, pkt_rx, msg_tx);
        });
        msg_rxs.push(msg_rx);
        pkt_txs.push(pkt_tx);
        writer_thds.push(writer_thd);
    }

    let barrier_clean = barrier.clone();
    let running_clean = running.clone();
    let clean_thd = thread::spawn(move || {
        clean_thread(configure, barrier_clean, running_clean, msg_rxs);
    });

    let mut capture = match Capture::init_capture(configure) {
        Ok(cap) => cap,
        Err(e) => {
            println!("capture error {:?}", e);
            return;
        }
    };

    barrier.wait();
    while running.load(Ordering::Relaxed) {
        let now = timenow();
        let pkt = capture.next_packet(now);
        if pkt.is_err() {
            continue;
        }
        let pkt = pkt.unwrap();
        if pkt.decode().is_err() {
            continue;
        }

        let index = (pkt.hash_value() % configure.thread_num) as usize;
        match &pkt_txs[index].try_send(pkt) {
            Ok(()) => {
                statis[index].send_ok += 1;
            }
            Err(TrySendError::Full(_)) => {
                statis[index].send_err += 1;
            }
            Err(TrySendError::Disconnected(_)) => {
                break;
            }
        }

        if configure.pcap_file.is_some() {
            thread::sleep(Duration::from_millis(1));
        }
    }

    running.store(false, Ordering::Relaxed);
    for writer_thd in writer_thds {
        writer_thd.join().unwrap();
    }
    clean_thd.join().unwrap();
    for i in 0..configure.thread_num {
        let i: usize = i.try_into().unwrap();
        println!(
            "statis[{}] send_ok:{}, send_err:{}",
            i, statis[i].send_ok, statis[i].send_err
        );
    }
    std::process::exit(0);
}

fn cli() -> Command {
    Command::new("nsave")
        .version(VERSION)
        .author(AUTHOR)
        .about("nsave server")
        .arg_required_else_help(false)
        .allow_external_subcommands(true)
        .arg(
            arg!(-c --config <FILE> "Sets a custom config file")
                .value_parser(value_parser!(PathBuf)),
        )
}

fn writer_thread(
    configure: &'static Configure,
    barrier: Arc<Barrier>,
    running: Arc<AtomicBool>,
    writer_id: u64,
    pkt_rx: Receiver<Arc<Packet>>,
    msg_tx: SyncSender<Msg>,
) {
    let mut flow = Box::new(Flow::new_with_arg(
        configure.flow_max_table_capacity,
        configure.flow_node_timeout as u128,
        configure.flow_max_seq_gap,
    ));
    let mut now;
    let mut prev_ts = timenow();
    let mut recv_num: u64 = 0;
    let mut flow_err: u64 = 0;

    let mut data_path = PathBuf::new();
    data_path.push(&configure.store_path);
    data_path.push(format!("{:03}", writer_id));
    let store = Store::new(configure, data_path, msg_tx);
    if let Err(e) = store.init() {
        println!("packet store init error: {}", e);
        return;
    }

    barrier.wait();
    println!("writer: {:?} running...", writer_id);
    while running.load(Ordering::Relaxed) {
        match pkt_rx.try_recv() {
            Ok(pkt) => {
                recv_num += 1;
                now = pkt.timestamp;
                let mut remove_key = None;
                if let Some(node) = flow.get_mut_or_new(&pkt, now) {
                    node.update(&pkt, now);

                    if node.store_ctx.is_none() {
                        node.store_ctx = Some(StoreCtx::new());
                    }
                    if store.store(node, pkt, now).is_err() {
                        break;
                    }

                    if node.is_fin() {
                        remove_key = Some(node.key);
                        let _ = store.link_fin(&node.key, node.start_time, now);
                    }
                } else {
                    flow_err += 1;
                    continue;
                }
                if let Some(key) = remove_key {
                    flow.remove(&key);
                }
            }
            Err(TryRecvError::Empty) => {
                thread::sleep(Duration::from_millis(configure.writer_empty_sleep as u64));
                now = timenow();
            }
            Err(TryRecvError::Disconnected) => {
                println!("writer_thread: {:?} recv disconnected", writer_id);
                break;
            }
        }

        if now > prev_ts + configure.timer_intervel as u128 {
            prev_ts = now;

            if store.timer(now).is_err() {
                break;
            }
            flow.timeout(now, |node| {
                if store.link_fin(&node.key, node.start_time, now).is_err() {
                    println!("store link fin error. node key{:?}", &node.key);
                }
            });
        }
    }
    store.finish();
    println!(
        "writer: {:?} exit. recv pkt: {}, flow_err: {}",
        writer_id, recv_num, flow_err
    );
}

fn clean_thread(
    configure: &'static Configure,
    barrier: Arc<Barrier>,
    running: Arc<AtomicBool>,
    msg_rxs: Vec<Receiver<Msg>>,
) {
    barrier.wait();
    println!("clean thread running...");
    while running.load(Ordering::Relaxed) {
        for msg_rx in msg_rxs.iter() {
            match msg_rx.try_recv() {
                Ok(Msg::CoverChunk(pool_path, end_time)) => {
                    let _ = clean_index_dir(pool_path, ts_date(end_time));
                }
                Err(TryRecvError::Empty) => {
                    thread::sleep(Duration::from_millis(configure.clean_empty_sleep as u64));
                }
                Err(TryRecvError::Disconnected) => {
                    println!("clean_thread: {:?} recv disconnected", msg_rx);
                    return;
                }
            }
        }
    }
    println!("clean thread end...");
}

struct Statis {
    send_ok: usize,
    send_err: usize,
}

impl Statis {
    pub fn new() -> Statis {
        Statis {
            send_ok: 0,
            send_err: 0,
        }
    }
}