1use crc32fast::Hasher;
2use scan_fmt::scan_fmt;
3use std::fs::*;
4use std::io::{BufRead, BufReader};
5use std::io::{Error, ErrorKind, Seek, SeekFrom, Write};
6use std::path::Path;
7use std::time::SystemTime;
8
9#[derive(Debug)]
10pub struct ModuleInfo {
11 _base_path: String,
12 name: String,
13 ff_info: File,
14 is_ready: bool,
15 is_writer: bool,
16}
17
18impl ModuleInfo {
19 pub fn new(base_path: &str, info_name: &str, is_writer: bool) -> std::io::Result<ModuleInfo> {
20 if !Path::new(base_path).exists() {
21 if let Err(e) = create_dir_all(base_path) {
22 error!("queue:{} create dir [{}], err={}", info_name, base_path, e);
23 return Err(e);
24 }
25 }
26
27 let info_path = base_path.to_owned() + "/module-info/";
28 if !Path::new(&info_path).exists() {
29 if let Err(e) = create_dir_all(&info_path) {
30 error!("queue:{} create dir [{}], err={}", info_name, info_path, e);
31 return Err(e);
32 }
33 }
34
35 let file_name_info = info_path + info_name + "_info";
36
37 let ff = if is_writer {
38 OpenOptions::new()
39 .read(true)
40 .write(is_writer)
41 .create(true)
42 .open(file_name_info)
43 } else {
44 OpenOptions::new().read(true).open(file_name_info)
45 };
46
47 if let Ok(f) = ff {
48 let mut mi = ModuleInfo {
49 _base_path: base_path.to_owned(),
50 name: info_name.to_owned(),
51 ff_info: f,
52 is_ready: true,
53 is_writer,
54 };
55
56 if mi.read_info().is_none() {
57 if let Err(e) = mi.put_info(0, 0) {
58 info!("fail write module info, err={}", e);
59 }
60 }
61
62 Ok(mi)
63 } else {
64 Err(ff.err().unwrap())
65 }
66 }
67
68 pub fn put_info(&mut self, op_id: i64, committed_op_id: i64) -> std::io::Result<()> {
69 if !self.is_ready {
70 return Err(Error::new(ErrorKind::Other, "module_info not ready"));
71 }
72
73 if !self.is_writer {
74 return Err(Error::new(ErrorKind::Other, "module_info open as read only"));
75 }
76
77 self.ff_info.seek(SeekFrom::Start(0))?;
78
79 let p = format!("{};{};{};", self.name, op_id, committed_op_id);
80 let mut hash = Hasher::new();
81 hash.update(p.as_bytes());
82
83 if let Err(e) = self
84 .ff_info
85 .write(format!("{}{:X}\n", p, hash.finalize()).as_bytes())
86 {
87 error!("fail put info push, set queue.ready = false, err={}", e);
88 self.is_ready = false;
89 return Err(e);
90 }
91
92 Ok(())
93 }
94
95 pub fn read_modified(&self) -> std::io::Result<SystemTime> {
96 self.ff_info.sync_data()?;
97 self.ff_info.metadata()?.modified()
98 }
99
100 pub fn read_info(&mut self) -> Option<(i64, i64)> {
101 let mut res = false;
102 let mut op_id = 0;
103 let mut committed_op_id = 0;
104
105 if self.ff_info.seek(SeekFrom::Start(0)).is_err() {
106 return None;
107 }
108
109 if let Some(line) = BufReader::new(&self.ff_info).lines().next() {
110 res = true;
111 if let Ok(ll) = line {
112 match scan_fmt!(&ll, "{};{};{};{}", String, i64, i64, String) {
113 Ok((q, oid, coid, _crc)) => {
114 if q != self.name {
115 res = false;
116 }
117 op_id = oid;
118 committed_op_id = coid;
119 }
120 Err(_) => res = false,
121 }
122 } else {
123 return None;
124 }
125 }
126
127 if res {
128 return Some((op_id, committed_op_id));
129 }
130
131 None
132 }
133}