1use crc32fast::Hasher;
2use std::fs::*;
3use std::io::{BufRead, BufReader};
4use std::io::{Error, ErrorKind, Seek, SeekFrom, Write};
5use std::path::Path;
6
7pub struct ModuleInfo {
8 _base_path: String,
9 name: String,
10 ff_info: File,
11 is_ready: bool,
12 is_writer: bool,
13}
14
15impl ModuleInfo {
16 pub fn new(base_path: &str, info_name: &str, is_writer: bool) -> std::io::Result<ModuleInfo> {
17 if !Path::new(&base_path).exists() {
18 if let Err(e) = create_dir_all(base_path.to_owned()) {
19 error!("queue:{} create dir [{}], err={}", info_name, base_path, e);
20 return Err(e);
21 }
22 }
23
24 let info_path = base_path.to_owned() + "/module-info/";
25 if !Path::new(&info_path).exists() {
26 if let Err(e) = create_dir_all(info_path.to_owned()) {
27 error!("queue:{} create dir [{}], err={}", info_name, info_path, e);
28 return Err(e);
29 }
30 }
31
32 let file_name_info = info_path + info_name + "_info";
33
34 let ff = if is_writer {
35 match OpenOptions::new().read(true).write(is_writer).create(true).open(file_name_info) {
36 Ok(ff) => Ok(ff),
37 Err(e) => Err(e),
38 }
39 } else {
40 match OpenOptions::new().read(true).open(file_name_info) {
41 Ok(ff) => Ok(ff),
42 Err(e) => Err(e),
43 }
44 };
45
46 if let Ok(f) = ff {
47 let mut mi = ModuleInfo {
48 _base_path: base_path.to_owned(),
49 name: info_name.to_owned(),
50 ff_info: f,
51 is_ready: true,
52 is_writer,
53 };
54
55 if mi.read_info().is_none() {
56 if let Err(e) = mi.put_info(0, 0) {
57 info!("fail write module info, err={}", e);
58 }
59 }
60
61 Ok(mi)
62 } else {
63 Err(ff.err().unwrap())
64 }
65 }
66
67 pub fn put_info(&mut self, op_id: i64, committed_op_id: i64) -> std::io::Result<()> {
68 if !self.is_ready {
69 return Err(Error::new(ErrorKind::Other, "module_info not ready"));
70 }
71
72 if !self.is_writer {
73 return Err(Error::new(ErrorKind::Other, "module_info open as read only"));
74 }
75
76 self.ff_info.seek(SeekFrom::Start(0))?;
78
79 let p = format!("{};{};{};", self.name, op_id, committed_op_id);
80 let mut hash = Hasher::new();
81 hash.update(p.as_bytes());
82
83 if let Err(e) = self.ff_info.write(format!("{}{:X}\n", p, hash.finalize()).as_bytes()) {
84 error!("fail put info push, set queue.ready = false, err={}", e);
85 self.is_ready = false;
86 return Err(e);
87 }
88
89 Ok(())
93 }
94
95 pub fn read_info(&mut self) -> Option<(i64, i64)> {
96 let mut res = false;
97 let mut op_id = 0;
98 let mut committed_op_id = 0;
99
100 if self.ff_info.seek(SeekFrom::Start(0)).is_err() {
101 return None;
102 }
103
104 if let Some(line) = BufReader::new(&self.ff_info).lines().next() {
105 res = true;
106 if let Ok(ll) = line {
107 let (module_name, _op_id, _committed_op_id, _crc) = scan_fmt!(&ll, "{};{};{};{}", String, i64, i64, String);
108
109 match module_name {
110 Some(q) => {
111 if q != self.name {
112 res = false;
113 }
114 }
115 None => res = false,
116 }
117
118 match _op_id {
119 Some(q) => op_id = q,
120 None => res = false,
121 }
122
123 match _committed_op_id {
124 Some(q) => committed_op_id = q,
125 None => res = false,
126 }
127 } else {
128 return None;
129 }
130 }
131
132 if res {
133 return Some((op_id, committed_op_id));
134 }
135
136 None
137 }
138}