v_common_module/
info.rs

1use crc32fast::Hasher;
2use std::fs::*;
3use std::io::{BufRead, BufReader};
4use std::io::{Error, ErrorKind, Seek, SeekFrom, Write};
5use std::path::Path;
6
7pub struct ModuleInfo {
8    _base_path: String,
9    name: String,
10    ff_info: File,
11    is_ready: bool,
12    is_writer: bool,
13}
14
15impl ModuleInfo {
16    pub fn new(base_path: &str, info_name: &str, is_writer: bool) -> std::io::Result<ModuleInfo> {
17        if !Path::new(&base_path).exists() {
18            if let Err(e) = create_dir_all(base_path.to_owned()) {
19                error!("queue:{} create dir [{}], err={}", info_name, base_path, e);
20                return Err(e);
21            }
22        }
23
24        let info_path = base_path.to_owned() + "/module-info/";
25        if !Path::new(&info_path).exists() {
26            if let Err(e) = create_dir_all(info_path.to_owned()) {
27                error!("queue:{} create dir [{}], err={}", info_name, info_path, e);
28                return Err(e);
29            }
30        }
31
32        let file_name_info = info_path + info_name + "_info";
33
34        let ff = if is_writer {
35            match OpenOptions::new().read(true).write(is_writer).create(true).open(file_name_info) {
36                Ok(ff) => Ok(ff),
37                Err(e) => Err(e),
38            }
39        } else {
40            match OpenOptions::new().read(true).open(file_name_info) {
41                Ok(ff) => Ok(ff),
42                Err(e) => Err(e),
43            }
44        };
45
46        if let Ok(f) = ff {
47            let mut mi = ModuleInfo {
48                _base_path: base_path.to_owned(),
49                name: info_name.to_owned(),
50                ff_info: f,
51                is_ready: true,
52                is_writer,
53            };
54
55            if mi.read_info().is_none() {
56                if let Err(e) = mi.put_info(0, 0) {
57                    info!("fail write module info, err={}", e);
58                }
59            }
60
61            Ok(mi)
62        } else {
63            Err(ff.err().unwrap())
64        }
65    }
66
67    pub fn put_info(&mut self, op_id: i64, committed_op_id: i64) -> std::io::Result<()> {
68        if !self.is_ready {
69            return Err(Error::new(ErrorKind::Other, "module_info not ready"));
70        }
71
72        if !self.is_writer {
73            return Err(Error::new(ErrorKind::Other, "module_info open as read only"));
74        }
75
76        //self.ff_info.set_len(0)?;
77        self.ff_info.seek(SeekFrom::Start(0))?;
78
79        let p = format!("{};{};{};", self.name, op_id, committed_op_id);
80        let mut hash = Hasher::new();
81        hash.update(p.as_bytes());
82
83        if let Err(e) = self.ff_info.write(format!("{}{:X}\n", p, hash.finalize()).as_bytes()) {
84            error!("fail put info push, set queue.ready = false, err={}", e);
85            self.is_ready = false;
86            return Err(e);
87        }
88
89        //self.ff_info.sync_data();
90        //info!("op_id={}", op_id);
91
92        Ok(())
93    }
94
95    pub fn read_info(&mut self) -> Option<(i64, i64)> {
96        let mut res = false;
97        let mut op_id = 0;
98        let mut committed_op_id = 0;
99
100        if self.ff_info.seek(SeekFrom::Start(0)).is_err() {
101            return None;
102        }
103
104        if let Some(line) = BufReader::new(&self.ff_info).lines().next() {
105            res = true;
106            if let Ok(ll) = line {
107                let (module_name, _op_id, _committed_op_id, _crc) = scan_fmt!(&ll, "{};{};{};{}", String, i64, i64, String);
108
109                match module_name {
110                    Some(q) => {
111                        if q != self.name {
112                            res = false;
113                        }
114                    }
115                    None => res = false,
116                }
117
118                match _op_id {
119                    Some(q) => op_id = q,
120                    None => res = false,
121                }
122
123                match _committed_op_id {
124                    Some(q) => committed_op_id = q,
125                    None => res = false,
126                }
127            } else {
128                return None;
129            }
130        }
131
132        if res {
133            return Some((op_id, committed_op_id));
134        }
135
136        None
137    }
138}