Skip to main content

v_module_queue/
info.rs

1use crc32fast::Hasher;
2use scan_fmt::scan_fmt;
3use std::fs::*;
4use std::io::{BufRead, BufReader};
5use std::io::{Error, ErrorKind, Seek, SeekFrom, Write};
6use std::path::Path;
7use std::time::SystemTime;
8
9#[derive(Debug)]
10pub struct ModuleInfo {
11    _base_path: String,
12    name: String,
13    ff_info: File,
14    is_ready: bool,
15    is_writer: bool,
16}
17
18impl ModuleInfo {
19    pub fn new(base_path: &str, info_name: &str, is_writer: bool) -> std::io::Result<ModuleInfo> {
20        if !Path::new(base_path).exists() {
21            if let Err(e) = create_dir_all(base_path) {
22                error!("queue:{} create dir [{}], err={}", info_name, base_path, e);
23                return Err(e);
24            }
25        }
26
27        let info_path = base_path.to_owned() + "/module-info/";
28        if !Path::new(&info_path).exists() {
29            if let Err(e) = create_dir_all(&info_path) {
30                error!("queue:{} create dir [{}], err={}", info_name, info_path, e);
31                return Err(e);
32            }
33        }
34
35        let file_name_info = info_path + info_name + "_info";
36
37        let ff = if is_writer {
38            OpenOptions::new()
39                .read(true)
40                .write(is_writer)
41                .create(true)
42                .open(file_name_info)
43        } else {
44            OpenOptions::new().read(true).open(file_name_info)
45        };
46
47        if let Ok(f) = ff {
48            let mut mi = ModuleInfo {
49                _base_path: base_path.to_owned(),
50                name: info_name.to_owned(),
51                ff_info: f,
52                is_ready: true,
53                is_writer,
54            };
55
56            if mi.read_info().is_none() {
57                if let Err(e) = mi.put_info(0, 0) {
58                    info!("fail write module info, err={}", e);
59                }
60            }
61
62            Ok(mi)
63        } else {
64            Err(ff.err().unwrap())
65        }
66    }
67
68    pub fn put_info(&mut self, op_id: i64, committed_op_id: i64) -> std::io::Result<()> {
69        if !self.is_ready {
70            return Err(Error::new(ErrorKind::Other, "module_info not ready"));
71        }
72
73        if !self.is_writer {
74            return Err(Error::new(ErrorKind::Other, "module_info open as read only"));
75        }
76
77        self.ff_info.seek(SeekFrom::Start(0))?;
78
79        let p = format!("{};{};{};", self.name, op_id, committed_op_id);
80        let mut hash = Hasher::new();
81        hash.update(p.as_bytes());
82
83        if let Err(e) = self
84            .ff_info
85            .write(format!("{}{:X}\n", p, hash.finalize()).as_bytes())
86        {
87            error!("fail put info push, set queue.ready = false, err={}", e);
88            self.is_ready = false;
89            return Err(e);
90        }
91
92        Ok(())
93    }
94
95    pub fn read_modified(&self) -> std::io::Result<SystemTime> {
96        self.ff_info.sync_data()?;
97        self.ff_info.metadata()?.modified()
98    }
99
100    pub fn read_info(&mut self) -> Option<(i64, i64)> {
101        let mut res = false;
102        let mut op_id = 0;
103        let mut committed_op_id = 0;
104
105        if self.ff_info.seek(SeekFrom::Start(0)).is_err() {
106            return None;
107        }
108
109        if let Some(line) = BufReader::new(&self.ff_info).lines().next() {
110            res = true;
111            if let Ok(ll) = line {
112                match scan_fmt!(&ll, "{};{};{};{}", String, i64, i64, String) {
113                    Ok((q, oid, coid, _crc)) => {
114                        if q != self.name {
115                            res = false;
116                        }
117                        op_id = oid;
118                        committed_op_id = coid;
119                    }
120                    Err(_) => res = false,
121                }
122            } else {
123                return None;
124            }
125        }
126
127        if res {
128            return Some((op_id, committed_op_id));
129        }
130
131        None
132    }
133}