libnsave/
store.rs

1use crate::chunkindex::*;
2use crate::chunkpool::*;
3use crate::common::*;
4use crate::configure::*;
5use crate::flow::FlowNode;
6use crate::packet::*;
7use crate::timeindex::*;
8use chrono::{DateTime, Datelike, Local, Timelike};
9use std::{
10    cell::RefCell,
11    fs,
12    path::{Path, PathBuf},
13    sync::{mpsc::SyncSender, Arc},
14};
15
16#[derive(Debug)]
17pub struct StoreCtx {
18    prev_pkt_offset: RefCell<ChunkOffset>,
19    flow_key: RefCell<Option<PacketKey>>,
20    chunk_id: RefCell<Option<u32>>,
21
22    ti_write: RefCell<bool>,
23}
24
25impl StoreCtx {
26    pub fn new() -> Self {
27        StoreCtx {
28            prev_pkt_offset: RefCell::new(ChunkOffset::new()),
29            flow_key: RefCell::new(None),
30            chunk_id: RefCell::new(None),
31            ti_write: RefCell::new(false),
32        }
33    }
34}
35
36impl Default for StoreCtx {
37    fn default() -> Self {
38        Self::new()
39    }
40}
41
42#[derive(Debug)]
43pub struct Store {
44    store_dir: PathBuf,
45    current_dir: RefCell<Option<PathBuf>>,
46    current_dir_date: RefCell<DateTime<Local>>,
47    chunk_pool: ChunkPool,
48    msg_channel: SyncSender<Msg>,
49    chunk_index: RefCell<ChunkIndex>,
50    time_index: RefCell<TimeIndex>,
51}
52
53impl Store {
54    pub fn new(
55        configure: &'static Configure,
56        store_dir: PathBuf,
57        msg_channel: SyncSender<Msg>,
58    ) -> Self {
59        Store {
60            store_dir: store_dir.clone(),
61            current_dir: RefCell::new(None),
62            current_dir_date: RefCell::new(ts_date(0)),
63            chunk_pool: ChunkPool::new(
64                store_dir,
65                configure.pool_size,
66                configure.file_size,
67                configure.chunk_size,
68            ),
69            msg_channel,
70            chunk_index: RefCell::new(ChunkIndex::new(configure)),
71            time_index: RefCell::new(TimeIndex::new(configure)),
72        }
73    }
74
75    pub fn init(&self) -> Result<(), StoreError> {
76        self.chunk_pool.init()?;
77        Ok(())
78    }
79
80    pub fn store(
81        &self,
82        flow_node: &FlowNode,
83        pkt: Arc<Packet>,
84        now: u128,
85    ) -> Result<(), StoreError> {
86        self.init_dir(now)?;
87
88        let ctx = flow_node.store_ctx.as_ref().unwrap();
89        let pkt_offset = self.chunk_pool.write(pkt, now, |pool_path, end_time| {
90            let msg = Msg::CoverChunk(pool_path, end_time);
91            let _ = self.msg_channel.try_send(msg);
92        })?;
93        if ctx.prev_pkt_offset.borrow().chunk_id == pkt_offset.chunk_id {
94            self.chunk_pool
95                .update(&ctx.prev_pkt_offset.borrow(), &pkt_offset)?;
96        }
97        *ctx.prev_pkt_offset.borrow_mut() = pkt_offset;
98
99        let ci_offset = if ctx.flow_key.borrow().is_none()
100            || pkt_offset.chunk_id != ctx.chunk_id.borrow().unwrap()
101        {
102            *ctx.flow_key.borrow_mut() = Some(flow_node.key);
103            *ctx.chunk_id.borrow_mut() = Some(pkt_offset.chunk_id);
104            let offset = self.chunk_index.borrow_mut().write(ChunkIndexRd {
105                start_time: flow_node.start_time,
106                end_time: 0,
107                chunk_id: pkt_offset.chunk_id,
108                chunk_offset: pkt_offset.start_offset,
109                tuple5: ctx.flow_key.borrow().unwrap(),
110            })?;
111            Some(offset)
112        } else {
113            None
114        };
115
116        if let Some(offset) = ci_offset {
117            let mut ti_write = ctx.ti_write.borrow_mut();
118            if !*ti_write {
119                *ti_write = true;
120                self.time_index.borrow_mut().write(LinkRecord {
121                    start_time: flow_node.start_time,
122                    end_time: 0,
123                    tuple5: ctx.flow_key.borrow().unwrap(),
124                    ci_offset: offset,
125                })?;
126            }
127        }
128        Ok(())
129    }
130
131    pub fn link_fin(
132        &self,
133        tuple5: &PacketKey,
134        start_time: u128,
135        now: u128,
136    ) -> Result<(), StoreError> {
137        self.init_dir(now)?;
138
139        let ci_offset = self.chunk_index.borrow_mut().write(ChunkIndexRd {
140            start_time,
141            end_time: now,
142            chunk_id: 0,
143            chunk_offset: 0,
144            tuple5: *tuple5,
145        });
146
147        if let Ok(offset) = ci_offset {
148            self.time_index.borrow_mut().write(LinkRecord {
149                start_time,
150                end_time: now,
151                tuple5: *tuple5,
152                ci_offset: offset,
153            })?;
154        }
155        Ok(())
156    }
157
158    pub fn timer(&self, now: u128) -> Result<(), StoreError> {
159        if self.current_dir.borrow().is_none() {
160            return Ok(());
161        }
162
163        let date_now = ts_date(now);
164        let cur_dir_date = *self.current_dir_date.borrow();
165        if !(date_now.year() == cur_dir_date.year()
166            && date_now.month() == cur_dir_date.month()
167            && date_now.day() == cur_dir_date.day()
168            && date_now.hour() == cur_dir_date.hour()
169            && date_now.minute() == cur_dir_date.minute())
170        {
171            self.chunk_index.borrow_mut().change_dir()?;
172            self.time_index.borrow_mut().change_dir()?;
173
174            *self.current_dir.borrow_mut() = None;
175        }
176        Ok(())
177    }
178
179    pub fn finish(&self) {
180        self.chunk_pool.finish();
181        self.chunk_index.borrow_mut().finish();
182        self.time_index.borrow_mut().finish();
183    }
184
185    fn init_dir(&self, now: u128) -> Result<(), StoreError> {
186        if self.current_dir.borrow().is_none() {
187            self.mk_time_dir(now)?;
188            self.chunk_index
189                .borrow_mut()
190                .init_dir(self.current_dir.borrow().as_ref().unwrap())?;
191            self.time_index
192                .borrow_mut()
193                .init_dir(self.current_dir.borrow().as_ref().unwrap())?;
194        }
195        Ok(())
196    }
197
198    fn mk_time_dir(&self, timestamp: u128) -> Result<(), StoreError> {
199        let date = ts_date(timestamp);
200        let mut path = PathBuf::new();
201        path.push(&self.store_dir);
202        path.push(format!("{:04}", date.year()));
203        path.push(format!("{:02}", date.month()));
204        path.push(format!("{:02}", date.day()));
205        path.push(format!("{:02}", date.hour()));
206        path.push(format!("{:02}", date.minute()));
207
208        if !path.exists() && fs::create_dir_all(&path).is_err() {
209            return Err(StoreError::WriteError("create dir error".to_string()));
210        }
211
212        *self.current_dir.borrow_mut() = Some(path);
213        *self.current_dir_date.borrow_mut() = date;
214        Ok(())
215    }
216}
217
218pub fn clean_index_dir(pool_path: PathBuf, end_date: DateTime<Local>) -> Result<(), StoreError> {
219    let now_date = ts_date(timenow());
220    if now_date.minute() == end_date.minute() {
221        return Ok(());
222    }
223
224    clean_minute_dir(&pool_path, end_date)?;
225    clean_hour_dir(&pool_path, end_date)?;
226    clean_day_dir(&pool_path, end_date)?;
227    clean_month_dir(&pool_path, end_date)?;
228    clean_year_dir(&pool_path, end_date)?;
229    Ok(())
230}
231
232fn clean_minute_dir(pool_path: &Path, end_date: DateTime<Local>) -> Result<(), StoreError> {
233    for minute in (0..=end_date.minute()).rev() {
234        let mut path = PathBuf::new();
235        path.push(pool_path);
236        path.pop();
237        path.push(format!("{:04}", end_date.year()));
238        path.push(format!("{:02}", end_date.month()));
239        path.push(format!("{:02}", end_date.day()));
240        path.push(format!("{:02}", end_date.hour()));
241        path.push(format!("{:02}", minute));
242        if path.exists() {
243            println!("minute path: {:?}", path);
244            fs::remove_dir_all(path)?;
245        }
246    }
247    Ok(())
248}
249
250fn clean_hour_dir(pool_path: &Path, end_date: DateTime<Local>) -> Result<(), StoreError> {
251    let mut path = PathBuf::new();
252    path.push(pool_path);
253    path.pop();
254    path.push(format!("{:04}", end_date.year()));
255    path.push(format!("{:02}", end_date.month()));
256    path.push(format!("{:02}", end_date.day()));
257    path.push(format!("{:02}", end_date.hour()));
258    if is_empty_dir(&path) {
259        println!("hour path: {:?}", path);
260        fs::remove_dir_all(path)?;
261    }
262
263    for hour in (0..end_date.hour()).rev() {
264        let mut path = PathBuf::new();
265        path.push(pool_path);
266        path.pop();
267        path.push(format!("{:04}", end_date.year()));
268        path.push(format!("{:02}", end_date.month()));
269        path.push(format!("{:02}", end_date.day()));
270        path.push(format!("{:02}", hour));
271        if path.exists() {
272            println!("hour before path: {:?}", path);
273            fs::remove_dir_all(path)?;
274        }
275    }
276    Ok(())
277}
278
279fn clean_day_dir(pool_path: &Path, end_date: DateTime<Local>) -> Result<(), StoreError> {
280    let mut path = PathBuf::new();
281    path.push(pool_path);
282    path.pop();
283    path.push(format!("{:04}", end_date.year()));
284    path.push(format!("{:02}", end_date.month()));
285    path.push(format!("{:02}", end_date.day()));
286    if is_empty_dir(&path) {
287        println!("day path: {:?}", path);
288        fs::remove_dir_all(path)?;
289    }
290
291    for day in (1..end_date.day()).rev() {
292        let mut path = PathBuf::new();
293        path.push(pool_path);
294        path.pop();
295        path.push(format!("{:04}", end_date.year()));
296        path.push(format!("{:02}", end_date.month()));
297        path.push(format!("{:02}", day));
298        if path.exists() {
299            println!("day before path: {:?}", path);
300            fs::remove_dir_all(path)?;
301        }
302    }
303    Ok(())
304}
305
306fn clean_month_dir(pool_path: &Path, end_date: DateTime<Local>) -> Result<(), StoreError> {
307    let mut path = PathBuf::new();
308    path.push(pool_path);
309    path.pop();
310    path.push(format!("{:04}", end_date.year()));
311    path.push(format!("{:02}", end_date.month()));
312    if is_empty_dir(&path) {
313        println!("moutn path: {:?}", path);
314        fs::remove_dir_all(path)?;
315    }
316
317    for month in (1..end_date.month()).rev() {
318        let mut path = PathBuf::new();
319        path.push(pool_path);
320        path.pop();
321        path.push(format!("{:04}", end_date.year()));
322        path.push(format!("{:02}", month));
323        if path.exists() {
324            println!("month before path: {:?}", path);
325            fs::remove_dir_all(path)?;
326        }
327    }
328    Ok(())
329}
330
331fn clean_year_dir(pool_path: &Path, end_date: DateTime<Local>) -> Result<(), StoreError> {
332    let mut path = PathBuf::new();
333    path.push(pool_path);
334    path.pop();
335    path.push(format!("{:04}", end_date.year()));
336    if is_empty_dir(&path) {
337        fs::remove_dir_all(path)?;
338    }
339
340    let mut path = PathBuf::new();
341    path.push(pool_path);
342    path.pop();
343    path.push(format!("{:04}", end_date.year() - 1));
344    if path.exists() {
345        fs::remove_dir_all(path)?;
346    }
347    Ok(())
348}
349
350fn is_empty_dir(dir_path: &Path) -> bool {
351    let mut entries = match fs::read_dir(dir_path) {
352        Ok(entries) => entries,
353        Err(_) => return false, // 如果读取目录失败,可能是因为没有权限等原因,不视为空
354    };
355    entries.next().is_none()
356}