1use crate::common::*;
2use crate::configure::*;
3use crate::mmapbuf::*;
4use crate::packet::*;
5use crate::search_ti::*;
6use bincode::deserialize_from;
7use chrono::{Duration, NaiveDateTime};
8use serde::{Deserialize, Serialize};
9use std::borrow::Borrow;
10use std::collections::HashSet;
11use std::fmt;
12use std::hash::{Hash, Hasher};
13use std::net::IpAddr;
14use std::{
15 fs::{File, OpenOptions},
16 path::{Path, PathBuf},
17};
18
19#[derive(Debug)]
20pub struct TimeIndex {
21 configure: &'static Configure,
22 buf_writer: Option<MmapBufWriter>,
23}
24
25impl TimeIndex {
26 pub fn new(configure: &'static Configure) -> Self {
27 TimeIndex {
28 configure,
29 buf_writer: None,
30 }
31 }
32
33 pub fn init_dir(&mut self, dir: &Path) -> Result<(), StoreError> {
34 let mut path = PathBuf::new();
35 path.push(dir);
36 path.push("timeindex.ti");
37 let result = OpenOptions::new()
38 .read(true)
39 .write(true)
40 .create(true)
41 .truncate(true)
42 .open(path);
43 match result {
44 Ok(fd) => {
45 let meta = fd.metadata()?;
46 self.buf_writer = Some(MmapBufWriter::with_arg(
47 fd,
48 meta.len(),
49 self.configure.ti_buff_size,
50 ));
51 }
52 Err(e) => return Err(StoreError::IoError(e)),
53 }
54 Ok(())
55 }
56
57 pub fn change_dir(&mut self) -> Result<(), StoreError> {
58 self.buf_writer = None;
59 Ok(())
60 }
61
62 pub fn write(&mut self, record: LinkRecord) -> Result<u64, StoreError> {
63 let ci_offset = self.buf_writer.borrow().as_ref().unwrap().next_offset();
64 if let Some(ref mut writer) = self.buf_writer {
65 if bincode::serialize_into(writer, &record).is_err() {
66 return Err(StoreError::WriteError("time index write error".to_string()));
67 }
68 }
69 Ok(ci_offset)
70 }
71
72 pub fn finish(&mut self) {
73 self.buf_writer = None;
74 }
75}
76
77#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
78pub struct LinkRecord {
79 pub start_time: u128,
80 pub end_time: u128,
81 pub tuple5: PacketKey,
82 pub ci_offset: u64,
83}
84
85impl PartialEq for LinkRecord {
86 fn eq(&self, other: &Self) -> bool {
87 self.start_time == other.start_time && self.tuple5 == other.tuple5
88 }
89}
90
91impl Eq for LinkRecord {}
92
93impl Hash for LinkRecord {
94 fn hash<H: Hasher>(&self, state: &mut H) {
95 self.start_time.hash(state);
96 self.tuple5.hash(state);
97 }
98}
99
100impl fmt::Display for LinkRecord {
101 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
102 write!(
103 f,
104 "LinkRecord {{ start_time: {} ({}), end_time: {} ({}), tuple5: {:?}, ci_offset: {} }}",
105 ts_date(self.start_time),
106 self.start_time,
107 ts_date(self.end_time),
108 self.end_time,
109 self.tuple5,
110 self.ci_offset
111 )
112 }
113}
114
115#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)]
116pub struct SearchKey {
117 pub start_time: Option<NaiveDateTime>,
118 pub end_time: Option<NaiveDateTime>,
119 pub sip: Option<IpAddr>,
120 pub dip: Option<IpAddr>,
121 pub sport: Option<u16>,
122 pub dport: Option<u16>,
123 pub protocol: Option<TransProto>,
124}
125
126impl fmt::Display for SearchKey {
127 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
128 write!(
129 f,
130 "SearchKey {{ start_time: {:?} ({:?}), end_time: {:?} ({:?}), sport: {:?}, dport: {:?}, protocol: {:?} }}",
131 self.start_time,
132 date_ts(self.start_time),
133 self.end_time,
134 date_ts(self.end_time),
135 self.sport,
136 self.dport,
137 self.protocol
138 )
139 }
140}
141
142pub fn dump_timeindex_file(path: PathBuf) -> Result<(), StoreError> {
143 match path.extension() {
144 Some(ext) => {
145 if !ext.to_str().unwrap().eq("ti") {
146 return Err(StoreError::CliError("not timeindex file".to_string()));
147 }
148 }
149 None => return Err(StoreError::CliError("not timeindex file".to_string())),
150 };
151
152 let file = match OpenOptions::new()
153 .read(true)
154 .write(true)
155 .create(false)
156 .truncate(false)
157 .open(&path)
158 {
159 Ok(fd) => fd,
160 Err(e) => {
161 return Err(StoreError::CliError(format!("open file error: {}", e)));
162 }
163 };
164 let mut mmap_reader = MmapBufReader::new(file);
165 println!("dump timeindex file: {:?}", path);
166 while let Ok(record) = deserialize_from::<_, LinkRecord>(&mut mmap_reader) {
167 println!("{}", record);
168 }
169 Ok(())
170}
171
172pub fn ti_search(
173 configure: &'static Configure,
174 search_key: SearchKey,
175 dir_id: u64,
176) -> Vec<LinkRecord> {
177 let mut ti_set = HashSet::new();
178 let mut search_date = search_key.start_time.unwrap();
179 while search_date < search_key.end_time.unwrap() {
180 if let Ok((ti_file, _ti_file_path)) = date2ti_file(configure, search_date, dir_id) {
181 ti_set = search_ti(search_key, ti_file)
182 .into_iter()
183 .collect::<HashSet<_>>();
184 }
185 search_date += Duration::try_minutes(1).unwrap();
186 }
187 ti_set.into_iter().collect()
188}
189
190fn search_ti(search_key: SearchKey, ti_file: File) -> Vec<LinkRecord> {
191 let mut record: Vec<LinkRecord> = Vec::new();
192 let mut reader = MmapBufReader::new(ti_file);
193 while let Ok(ti) = deserialize_from::<_, LinkRecord>(&mut reader) {
194 if match_ti(search_key, &ti) {
195 record.push(ti);
196 }
197 }
198 record
199}
200
201fn match_ti(search_key: SearchKey, record: &LinkRecord) -> bool {
202 let SearchKey {
203 start_time,
204 end_time,
205 sip,
206 dip,
207 sport,
208 dport,
209 protocol,
210 } = search_key;
211 let stime = date_ts(start_time);
212 let etime = date_ts(end_time);
213
214 let match0 = match_stime(record.start_time, stime)
215 && match_etime(record.end_time, etime)
216 && match_protocol(record.tuple5.trans_proto, protocol);
217 let match1 = match_ip(record.tuple5.addr1, sip)
218 && match_port(record.tuple5.port1, sport)
219 && match_ip(record.tuple5.addr2, dip)
220 && match_port(record.tuple5.port2, dport);
221 let match2 = match_ip(record.tuple5.addr1, dip)
222 && match_port(record.tuple5.port1, dport)
223 && match_ip(record.tuple5.addr2, sip)
224 && match_port(record.tuple5.port2, sport);
225 match0 && (match1 || match2)
226}
227
228fn match_stime(rd_start: u128, op_start: Option<u128>) -> bool {
229 if let Some(ts) = op_start {
230 ts <= rd_start
231 } else {
232 true
233 }
234}
235
236fn match_etime(rd_end: u128, op_end: Option<u128>) -> bool {
237 if let Some(ts) = op_end {
238 ts >= rd_end
239 } else {
240 true
241 }
242}
243
244fn match_ip(rd_ip: IpAddr, op_ip: Option<IpAddr>) -> bool {
245 if let Some(ip) = op_ip {
246 ip == rd_ip
247 } else {
248 true
249 }
250}
251
252fn match_protocol(rd_proto: TransProto, op_proto: Option<TransProto>) -> bool {
253 if let Some(proto) = op_proto {
254 proto == rd_proto
255 } else {
256 true
257 }
258}
259
260fn match_port(rd_port: u16, op_port: Option<u16>) -> bool {
261 if let Some(port) = op_port {
262 port == rd_port
263 } else {
264 true
265 }
266}
267
268pub fn search_lr(dir: &Path, tuple5: PacketKey) -> Option<LinkRecord> {
269 let mut path = PathBuf::new();
270 path.push(dir);
271 path.push("timeindex.ti");
272 match OpenOptions::new()
273 .read(true)
274 .write(true)
275 .create(false)
276 .truncate(false)
277 .open(&path)
278 {
279 Ok(ti_file) => {
280 let mut reader = MmapBufReader::new(ti_file);
281 while let Ok(ti) = deserialize_from::<_, LinkRecord>(&mut reader) {
282 if tuple5 == ti.tuple5 {
283 return Some(ti);
284 }
285 }
286 None
287 }
288 Err(_e) => None,
289 }
290}