libnsave/
timeindex.rs

1use crate::common::*;
2use crate::configure::*;
3use crate::mmapbuf::*;
4use crate::packet::*;
5use crate::search_ti::*;
6use bincode::deserialize_from;
7use chrono::{Duration, NaiveDateTime};
8use serde::{Deserialize, Serialize};
9use std::borrow::Borrow;
10use std::collections::HashSet;
11use std::fmt;
12use std::hash::{Hash, Hasher};
13use std::net::IpAddr;
14use std::{
15    fs::{File, OpenOptions},
16    path::{Path, PathBuf},
17};
18
19#[derive(Debug)]
20pub struct TimeIndex {
21    configure: &'static Configure,
22    buf_writer: Option<MmapBufWriter>,
23}
24
25impl TimeIndex {
26    pub fn new(configure: &'static Configure) -> Self {
27        TimeIndex {
28            configure,
29            buf_writer: None,
30        }
31    }
32
33    pub fn init_dir(&mut self, dir: &Path) -> Result<(), StoreError> {
34        let mut path = PathBuf::new();
35        path.push(dir);
36        path.push("timeindex.ti");
37        let result = OpenOptions::new()
38            .read(true)
39            .write(true)
40            .create(true)
41            .truncate(true)
42            .open(path);
43        match result {
44            Ok(fd) => {
45                let meta = fd.metadata()?;
46                self.buf_writer = Some(MmapBufWriter::with_arg(
47                    fd,
48                    meta.len(),
49                    self.configure.ti_buff_size,
50                ));
51            }
52            Err(e) => return Err(StoreError::IoError(e)),
53        }
54        Ok(())
55    }
56
57    pub fn change_dir(&mut self) -> Result<(), StoreError> {
58        self.buf_writer = None;
59        Ok(())
60    }
61
62    pub fn write(&mut self, record: LinkRecord) -> Result<u64, StoreError> {
63        let ci_offset = self.buf_writer.borrow().as_ref().unwrap().next_offset();
64        if let Some(ref mut writer) = self.buf_writer {
65            if bincode::serialize_into(writer, &record).is_err() {
66                return Err(StoreError::WriteError("time index write error".to_string()));
67            }
68        }
69        Ok(ci_offset)
70    }
71
72    pub fn finish(&mut self) {
73        self.buf_writer = None;
74    }
75}
76
77#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
78pub struct LinkRecord {
79    pub start_time: u128,
80    pub end_time: u128,
81    pub tuple5: PacketKey,
82    pub ci_offset: u64,
83}
84
85impl PartialEq for LinkRecord {
86    fn eq(&self, other: &Self) -> bool {
87        self.start_time == other.start_time && self.tuple5 == other.tuple5
88    }
89}
90
91impl Eq for LinkRecord {}
92
93impl Hash for LinkRecord {
94    fn hash<H: Hasher>(&self, state: &mut H) {
95        self.start_time.hash(state);
96        self.tuple5.hash(state);
97    }
98}
99
100impl fmt::Display for LinkRecord {
101    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
102        write!(
103            f,
104            "LinkRecord {{ start_time: {} ({}), end_time: {} ({}), tuple5: {:?}, ci_offset: {} }}",
105            ts_date(self.start_time),
106            self.start_time,
107            ts_date(self.end_time),
108            self.end_time,
109            self.tuple5,
110            self.ci_offset
111        )
112    }
113}
114
115#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)]
116pub struct SearchKey {
117    pub start_time: Option<NaiveDateTime>,
118    pub end_time: Option<NaiveDateTime>,
119    pub sip: Option<IpAddr>,
120    pub dip: Option<IpAddr>,
121    pub sport: Option<u16>,
122    pub dport: Option<u16>,
123    pub protocol: Option<TransProto>,
124}
125
126impl fmt::Display for SearchKey {
127    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
128        write!(
129            f,
130            "SearchKey {{ start_time: {:?} ({:?}), end_time: {:?} ({:?}), sport: {:?}, dport: {:?}, protocol: {:?} }}",
131            self.start_time,
132            date_ts(self.start_time),
133            self.end_time,
134            date_ts(self.end_time),
135            self.sport,
136            self.dport,
137            self.protocol
138        )
139    }
140}
141
142pub fn dump_timeindex_file(path: PathBuf) -> Result<(), StoreError> {
143    match path.extension() {
144        Some(ext) => {
145            if !ext.to_str().unwrap().eq("ti") {
146                return Err(StoreError::CliError("not timeindex file".to_string()));
147            }
148        }
149        None => return Err(StoreError::CliError("not timeindex file".to_string())),
150    };
151
152    let file = match OpenOptions::new()
153        .read(true)
154        .write(true)
155        .create(false)
156        .truncate(false)
157        .open(&path)
158    {
159        Ok(fd) => fd,
160        Err(e) => {
161            return Err(StoreError::CliError(format!("open file error: {}", e)));
162        }
163    };
164    let mut mmap_reader = MmapBufReader::new(file);
165    println!("dump timeindex file: {:?}", path);
166    while let Ok(record) = deserialize_from::<_, LinkRecord>(&mut mmap_reader) {
167        println!("{}", record);
168    }
169    Ok(())
170}
171
172pub fn ti_search(
173    configure: &'static Configure,
174    search_key: SearchKey,
175    dir_id: u64,
176) -> Vec<LinkRecord> {
177    let mut ti_set = HashSet::new();
178    let mut search_date = search_key.start_time.unwrap();
179    while search_date < search_key.end_time.unwrap() {
180        if let Ok((ti_file, _ti_file_path)) = date2ti_file(configure, search_date, dir_id) {
181            ti_set = search_ti(search_key, ti_file)
182                .into_iter()
183                .collect::<HashSet<_>>();
184        }
185        search_date += Duration::try_minutes(1).unwrap();
186    }
187    ti_set.into_iter().collect()
188}
189
190fn search_ti(search_key: SearchKey, ti_file: File) -> Vec<LinkRecord> {
191    let mut record: Vec<LinkRecord> = Vec::new();
192    let mut reader = MmapBufReader::new(ti_file);
193    while let Ok(ti) = deserialize_from::<_, LinkRecord>(&mut reader) {
194        if match_ti(search_key, &ti) {
195            record.push(ti);
196        }
197    }
198    record
199}
200
201fn match_ti(search_key: SearchKey, record: &LinkRecord) -> bool {
202    let SearchKey {
203        start_time,
204        end_time,
205        sip,
206        dip,
207        sport,
208        dport,
209        protocol,
210    } = search_key;
211    let stime = date_ts(start_time);
212    let etime = date_ts(end_time);
213
214    let match0 = match_stime(record.start_time, stime)
215        && match_etime(record.end_time, etime)
216        && match_protocol(record.tuple5.trans_proto, protocol);
217    let match1 = match_ip(record.tuple5.addr1, sip)
218        && match_port(record.tuple5.port1, sport)
219        && match_ip(record.tuple5.addr2, dip)
220        && match_port(record.tuple5.port2, dport);
221    let match2 = match_ip(record.tuple5.addr1, dip)
222        && match_port(record.tuple5.port1, dport)
223        && match_ip(record.tuple5.addr2, sip)
224        && match_port(record.tuple5.port2, sport);
225    match0 && (match1 || match2)
226}
227
228fn match_stime(rd_start: u128, op_start: Option<u128>) -> bool {
229    if let Some(ts) = op_start {
230        ts <= rd_start
231    } else {
232        true
233    }
234}
235
236fn match_etime(rd_end: u128, op_end: Option<u128>) -> bool {
237    if let Some(ts) = op_end {
238        ts >= rd_end
239    } else {
240        true
241    }
242}
243
244fn match_ip(rd_ip: IpAddr, op_ip: Option<IpAddr>) -> bool {
245    if let Some(ip) = op_ip {
246        ip == rd_ip
247    } else {
248        true
249    }
250}
251
252fn match_protocol(rd_proto: TransProto, op_proto: Option<TransProto>) -> bool {
253    if let Some(proto) = op_proto {
254        proto == rd_proto
255    } else {
256        true
257    }
258}
259
260fn match_port(rd_port: u16, op_port: Option<u16>) -> bool {
261    if let Some(port) = op_port {
262        port == rd_port
263    } else {
264        true
265    }
266}
267
268pub fn search_lr(dir: &Path, tuple5: PacketKey) -> Option<LinkRecord> {
269    let mut path = PathBuf::new();
270    path.push(dir);
271    path.push("timeindex.ti");
272    match OpenOptions::new()
273        .read(true)
274        .write(true)
275        .create(false)
276        .truncate(false)
277        .open(&path)
278    {
279        Ok(ti_file) => {
280            let mut reader = MmapBufReader::new(ti_file);
281            while let Ok(ti) = deserialize_from::<_, LinkRecord>(&mut reader) {
282                if tuple5 == ti.tuple5 {
283                    return Some(ti);
284                }
285            }
286            None
287        }
288        Err(_e) => None,
289    }
290}