1use crate::chunkindex::*;
2use crate::chunkpool::*;
3use crate::common::*;
4use crate::configure::*;
5use crate::flow::FlowNode;
6use crate::packet::*;
7use crate::timeindex::*;
8use chrono::{DateTime, Datelike, Local, Timelike};
9use std::{
10 cell::RefCell,
11 fs,
12 path::{Path, PathBuf},
13 sync::{mpsc::SyncSender, Arc},
14};
15
16#[derive(Debug)]
17pub struct StoreCtx {
18 prev_pkt_offset: RefCell<ChunkOffset>,
19 flow_key: RefCell<Option<PacketKey>>,
20 chunk_id: RefCell<Option<u32>>,
21
22 ti_write: RefCell<bool>,
23}
24
25impl StoreCtx {
26 pub fn new() -> Self {
27 StoreCtx {
28 prev_pkt_offset: RefCell::new(ChunkOffset::new()),
29 flow_key: RefCell::new(None),
30 chunk_id: RefCell::new(None),
31 ti_write: RefCell::new(false),
32 }
33 }
34}
35
36impl Default for StoreCtx {
37 fn default() -> Self {
38 Self::new()
39 }
40}
41
42#[derive(Debug)]
43pub struct Store {
44 store_dir: PathBuf,
45 current_dir: RefCell<Option<PathBuf>>,
46 current_dir_date: RefCell<DateTime<Local>>,
47 chunk_pool: ChunkPool,
48 msg_channel: SyncSender<Msg>,
49 chunk_index: RefCell<ChunkIndex>,
50 time_index: RefCell<TimeIndex>,
51}
52
53impl Store {
54 pub fn new(
55 configure: &'static Configure,
56 store_dir: PathBuf,
57 msg_channel: SyncSender<Msg>,
58 ) -> Self {
59 Store {
60 store_dir: store_dir.clone(),
61 current_dir: RefCell::new(None),
62 current_dir_date: RefCell::new(ts_date(0)),
63 chunk_pool: ChunkPool::new(
64 store_dir,
65 configure.pool_size,
66 configure.file_size,
67 configure.chunk_size,
68 ),
69 msg_channel,
70 chunk_index: RefCell::new(ChunkIndex::new(configure)),
71 time_index: RefCell::new(TimeIndex::new(configure)),
72 }
73 }
74
75 pub fn init(&self) -> Result<(), StoreError> {
76 self.chunk_pool.init()?;
77 Ok(())
78 }
79
80 pub fn store(
81 &self,
82 flow_node: &FlowNode,
83 pkt: Arc<Packet>,
84 now: u128,
85 ) -> Result<(), StoreError> {
86 self.init_dir(now)?;
87
88 let ctx = flow_node.store_ctx.as_ref().unwrap();
89 let pkt_offset = self.chunk_pool.write(pkt, now, |pool_path, end_time| {
90 let msg = Msg::CoverChunk(pool_path, end_time);
91 let _ = self.msg_channel.try_send(msg);
92 })?;
93 if ctx.prev_pkt_offset.borrow().chunk_id == pkt_offset.chunk_id {
94 self.chunk_pool
95 .update(&ctx.prev_pkt_offset.borrow(), &pkt_offset)?;
96 }
97 *ctx.prev_pkt_offset.borrow_mut() = pkt_offset;
98
99 let ci_offset = if ctx.flow_key.borrow().is_none()
100 || pkt_offset.chunk_id != ctx.chunk_id.borrow().unwrap()
101 {
102 *ctx.flow_key.borrow_mut() = Some(flow_node.key);
103 *ctx.chunk_id.borrow_mut() = Some(pkt_offset.chunk_id);
104 let offset = self.chunk_index.borrow_mut().write(ChunkIndexRd {
105 start_time: flow_node.start_time,
106 end_time: 0,
107 chunk_id: pkt_offset.chunk_id,
108 chunk_offset: pkt_offset.start_offset,
109 tuple5: ctx.flow_key.borrow().unwrap(),
110 })?;
111 Some(offset)
112 } else {
113 None
114 };
115
116 if let Some(offset) = ci_offset {
117 let mut ti_write = ctx.ti_write.borrow_mut();
118 if !*ti_write {
119 *ti_write = true;
120 self.time_index.borrow_mut().write(LinkRecord {
121 start_time: flow_node.start_time,
122 end_time: 0,
123 tuple5: ctx.flow_key.borrow().unwrap(),
124 ci_offset: offset,
125 })?;
126 }
127 }
128 Ok(())
129 }
130
131 pub fn link_fin(
132 &self,
133 tuple5: &PacketKey,
134 start_time: u128,
135 now: u128,
136 ) -> Result<(), StoreError> {
137 self.init_dir(now)?;
138
139 let ci_offset = self.chunk_index.borrow_mut().write(ChunkIndexRd {
140 start_time,
141 end_time: now,
142 chunk_id: 0,
143 chunk_offset: 0,
144 tuple5: *tuple5,
145 });
146
147 if let Ok(offset) = ci_offset {
148 self.time_index.borrow_mut().write(LinkRecord {
149 start_time,
150 end_time: now,
151 tuple5: *tuple5,
152 ci_offset: offset,
153 })?;
154 }
155 Ok(())
156 }
157
158 pub fn timer(&self, now: u128) -> Result<(), StoreError> {
159 if self.current_dir.borrow().is_none() {
160 return Ok(());
161 }
162
163 let date_now = ts_date(now);
164 let cur_dir_date = *self.current_dir_date.borrow();
165 if !(date_now.year() == cur_dir_date.year()
166 && date_now.month() == cur_dir_date.month()
167 && date_now.day() == cur_dir_date.day()
168 && date_now.hour() == cur_dir_date.hour()
169 && date_now.minute() == cur_dir_date.minute())
170 {
171 self.chunk_index.borrow_mut().change_dir()?;
172 self.time_index.borrow_mut().change_dir()?;
173
174 *self.current_dir.borrow_mut() = None;
175 }
176 Ok(())
177 }
178
179 pub fn finish(&self) {
180 self.chunk_pool.finish();
181 self.chunk_index.borrow_mut().finish();
182 self.time_index.borrow_mut().finish();
183 }
184
185 fn init_dir(&self, now: u128) -> Result<(), StoreError> {
186 if self.current_dir.borrow().is_none() {
187 self.mk_time_dir(now)?;
188 self.chunk_index
189 .borrow_mut()
190 .init_dir(self.current_dir.borrow().as_ref().unwrap())?;
191 self.time_index
192 .borrow_mut()
193 .init_dir(self.current_dir.borrow().as_ref().unwrap())?;
194 }
195 Ok(())
196 }
197
198 fn mk_time_dir(&self, timestamp: u128) -> Result<(), StoreError> {
199 let date = ts_date(timestamp);
200 let mut path = PathBuf::new();
201 path.push(&self.store_dir);
202 path.push(format!("{:04}", date.year()));
203 path.push(format!("{:02}", date.month()));
204 path.push(format!("{:02}", date.day()));
205 path.push(format!("{:02}", date.hour()));
206 path.push(format!("{:02}", date.minute()));
207
208 if !path.exists() && fs::create_dir_all(&path).is_err() {
209 return Err(StoreError::WriteError("create dir error".to_string()));
210 }
211
212 *self.current_dir.borrow_mut() = Some(path);
213 *self.current_dir_date.borrow_mut() = date;
214 Ok(())
215 }
216}
217
218pub fn clean_index_dir(pool_path: PathBuf, end_date: DateTime<Local>) -> Result<(), StoreError> {
219 let now_date = ts_date(timenow());
220 if now_date.minute() == end_date.minute() {
221 return Ok(());
222 }
223
224 clean_minute_dir(&pool_path, end_date)?;
225 clean_hour_dir(&pool_path, end_date)?;
226 clean_day_dir(&pool_path, end_date)?;
227 clean_month_dir(&pool_path, end_date)?;
228 clean_year_dir(&pool_path, end_date)?;
229 Ok(())
230}
231
232fn clean_minute_dir(pool_path: &Path, end_date: DateTime<Local>) -> Result<(), StoreError> {
233 for minute in (0..=end_date.minute()).rev() {
234 let mut path = PathBuf::new();
235 path.push(pool_path);
236 path.pop();
237 path.push(format!("{:04}", end_date.year()));
238 path.push(format!("{:02}", end_date.month()));
239 path.push(format!("{:02}", end_date.day()));
240 path.push(format!("{:02}", end_date.hour()));
241 path.push(format!("{:02}", minute));
242 if path.exists() {
243 println!("minute path: {:?}", path);
244 fs::remove_dir_all(path)?;
245 }
246 }
247 Ok(())
248}
249
250fn clean_hour_dir(pool_path: &Path, end_date: DateTime<Local>) -> Result<(), StoreError> {
251 let mut path = PathBuf::new();
252 path.push(pool_path);
253 path.pop();
254 path.push(format!("{:04}", end_date.year()));
255 path.push(format!("{:02}", end_date.month()));
256 path.push(format!("{:02}", end_date.day()));
257 path.push(format!("{:02}", end_date.hour()));
258 if is_empty_dir(&path) {
259 println!("hour path: {:?}", path);
260 fs::remove_dir_all(path)?;
261 }
262
263 for hour in (0..end_date.hour()).rev() {
264 let mut path = PathBuf::new();
265 path.push(pool_path);
266 path.pop();
267 path.push(format!("{:04}", end_date.year()));
268 path.push(format!("{:02}", end_date.month()));
269 path.push(format!("{:02}", end_date.day()));
270 path.push(format!("{:02}", hour));
271 if path.exists() {
272 println!("hour before path: {:?}", path);
273 fs::remove_dir_all(path)?;
274 }
275 }
276 Ok(())
277}
278
279fn clean_day_dir(pool_path: &Path, end_date: DateTime<Local>) -> Result<(), StoreError> {
280 let mut path = PathBuf::new();
281 path.push(pool_path);
282 path.pop();
283 path.push(format!("{:04}", end_date.year()));
284 path.push(format!("{:02}", end_date.month()));
285 path.push(format!("{:02}", end_date.day()));
286 if is_empty_dir(&path) {
287 println!("day path: {:?}", path);
288 fs::remove_dir_all(path)?;
289 }
290
291 for day in (1..end_date.day()).rev() {
292 let mut path = PathBuf::new();
293 path.push(pool_path);
294 path.pop();
295 path.push(format!("{:04}", end_date.year()));
296 path.push(format!("{:02}", end_date.month()));
297 path.push(format!("{:02}", day));
298 if path.exists() {
299 println!("day before path: {:?}", path);
300 fs::remove_dir_all(path)?;
301 }
302 }
303 Ok(())
304}
305
306fn clean_month_dir(pool_path: &Path, end_date: DateTime<Local>) -> Result<(), StoreError> {
307 let mut path = PathBuf::new();
308 path.push(pool_path);
309 path.pop();
310 path.push(format!("{:04}", end_date.year()));
311 path.push(format!("{:02}", end_date.month()));
312 if is_empty_dir(&path) {
313 println!("moutn path: {:?}", path);
314 fs::remove_dir_all(path)?;
315 }
316
317 for month in (1..end_date.month()).rev() {
318 let mut path = PathBuf::new();
319 path.push(pool_path);
320 path.pop();
321 path.push(format!("{:04}", end_date.year()));
322 path.push(format!("{:02}", month));
323 if path.exists() {
324 println!("month before path: {:?}", path);
325 fs::remove_dir_all(path)?;
326 }
327 }
328 Ok(())
329}
330
331fn clean_year_dir(pool_path: &Path, end_date: DateTime<Local>) -> Result<(), StoreError> {
332 let mut path = PathBuf::new();
333 path.push(pool_path);
334 path.pop();
335 path.push(format!("{:04}", end_date.year()));
336 if is_empty_dir(&path) {
337 fs::remove_dir_all(path)?;
338 }
339
340 let mut path = PathBuf::new();
341 path.push(pool_path);
342 path.pop();
343 path.push(format!("{:04}", end_date.year() - 1));
344 if path.exists() {
345 fs::remove_dir_all(path)?;
346 }
347 Ok(())
348}
349
350fn is_empty_dir(dir_path: &Path) -> bool {
351 let mut entries = match fs::read_dir(dir_path) {
352 Ok(entries) => entries,
353 Err(_) => return false, };
355 entries.next().is_none()
356}