Skip to main content

v_common/module/
info.rs

1use crc32fast::Hasher;
2use std::fs::*;
3use std::io::{BufRead, BufReader};
4use std::io::{Error, ErrorKind, Seek, SeekFrom, Write};
5use std::path::Path;
6use std::time::SystemTime;
7
8#[derive(Debug)]
9pub struct ModuleInfo {
10    _base_path: String,
11    name: String,
12    ff_info: File,
13    is_ready: bool,
14    is_writer: bool,
15}
16
17impl ModuleInfo {
18    pub fn new(base_path: &str, info_name: &str, is_writer: bool) -> std::io::Result<ModuleInfo> {
19        if !Path::new(&base_path).exists() {
20            if let Err(e) = create_dir_all(base_path) {
21                error!("queue:{} create dir [{}], err={}", info_name, base_path, e);
22                return Err(e);
23            }
24        }
25
26        let info_path = base_path.to_owned() + "/module-info/";
27        if !Path::new(&info_path).exists() {
28            if let Err(e) = create_dir_all(&info_path) {
29                error!("queue:{} create dir [{}], err={}", info_name, info_path, e);
30                return Err(e);
31            }
32        }
33
34        let file_name_info = info_path + info_name + "_info";
35
36        let ff = if is_writer {
37            OpenOptions::new().read(true).write(is_writer).create(true).open(file_name_info)
38        } else {
39            OpenOptions::new().read(true).open(file_name_info)
40        };
41
42        if let Ok(f) = ff {
43            let mut mi = ModuleInfo {
44                _base_path: base_path.to_owned(),
45                name: info_name.to_owned(),
46                ff_info: f,
47                is_ready: true,
48                is_writer,
49            };
50
51            if mi.read_info().is_none() {
52                if let Err(e) = mi.put_info(0, 0) {
53                    info!("fail write module info, err={}", e);
54                }
55            }
56
57            Ok(mi)
58        } else {
59            Err(ff.err().unwrap())
60        }
61    }
62
63    pub fn put_info(&mut self, op_id: i64, committed_op_id: i64) -> std::io::Result<()> {
64        if !self.is_ready {
65            return Err(Error::new(ErrorKind::Other, "module_info not ready"));
66        }
67
68        if !self.is_writer {
69            return Err(Error::new(ErrorKind::Other, "module_info open as read only"));
70        }
71
72        //self.ff_info.set_len(0)?;
73        self.ff_info.seek(SeekFrom::Start(0))?;
74
75        let p = format!("{};{};{};", self.name, op_id, committed_op_id);
76        let mut hash = Hasher::new();
77        hash.update(p.as_bytes());
78
79        if let Err(e) = self.ff_info.write(format!("{}{:X}\n", p, hash.finalize()).as_bytes()) {
80            error!("fail put info push, set queue.ready = false, err={}", e);
81            self.is_ready = false;
82            return Err(e);
83        }
84
85        //self.ff_info.sync_data();
86        //info!("op_id={}", op_id);
87
88        Ok(())
89    }
90
91    pub fn read_modified(&self) -> std::io::Result<SystemTime> {
92        self.ff_info.sync_data()?;
93        self.ff_info.metadata()?.modified()
94    }
95
96    pub fn read_info(&mut self) -> Option<(i64, i64)> {
97        let mut res = false;
98        let mut op_id = 0;
99        let mut committed_op_id = 0;
100
101        if self.ff_info.seek(SeekFrom::Start(0)).is_err() {
102            return None;
103        }
104
105        if let Some(line) = BufReader::new(&self.ff_info).lines().next() {
106            res = true;
107            if let Ok(ll) = line {
108                let (module_name, _op_id, _committed_op_id, _crc) = scan_fmt!(&ll, "{};{};{};{}", String, i64, i64, String);
109
110                match module_name {
111                    Some(q) => {
112                        if q != self.name {
113                            res = false;
114                        }
115                    },
116                    None => res = false,
117                }
118
119                match _op_id {
120                    Some(q) => op_id = q,
121                    None => res = false,
122                }
123
124                match _committed_op_id {
125                    Some(q) => committed_op_id = q,
126                    None => res = false,
127                }
128            } else {
129                return None;
130            }
131        }
132
133        if res {
134            return Some((op_id, committed_op_id));
135        }
136
137        None
138    }
139}