use crc32fast::Hasher;
use scan_fmt::scan_fmt;
use std::fs::*;
use std::io::{BufRead, BufReader};
use std::io::{Error, ErrorKind, Seek, SeekFrom, Write};
use std::path::Path;
use std::time::SystemTime;
#[derive(Debug)]
pub struct ModuleInfo {
_base_path: String,
name: String,
ff_info: File,
is_ready: bool,
is_writer: bool,
}
impl ModuleInfo {
pub fn new(base_path: &str, info_name: &str, is_writer: bool) -> std::io::Result<ModuleInfo> {
if !Path::new(base_path).exists() {
if let Err(e) = create_dir_all(base_path) {
error!("queue:{} create dir [{}], err={}", info_name, base_path, e);
return Err(e);
}
}
let info_path = base_path.to_owned() + "/module-info/";
if !Path::new(&info_path).exists() {
if let Err(e) = create_dir_all(&info_path) {
error!("queue:{} create dir [{}], err={}", info_name, info_path, e);
return Err(e);
}
}
let file_name_info = info_path + info_name + "_info";
let ff = if is_writer {
OpenOptions::new()
.read(true)
.write(is_writer)
.create(true)
.open(file_name_info)
} else {
OpenOptions::new().read(true).open(file_name_info)
};
if let Ok(f) = ff {
let mut mi = ModuleInfo {
_base_path: base_path.to_owned(),
name: info_name.to_owned(),
ff_info: f,
is_ready: true,
is_writer,
};
if mi.read_info().is_none() {
if let Err(e) = mi.put_info(0, 0) {
info!("fail write module info, err={}", e);
}
}
Ok(mi)
} else {
Err(ff.err().unwrap())
}
}
pub fn put_info(&mut self, op_id: i64, committed_op_id: i64) -> std::io::Result<()> {
if !self.is_ready {
return Err(Error::new(ErrorKind::Other, "module_info not ready"));
}
if !self.is_writer {
return Err(Error::new(ErrorKind::Other, "module_info open as read only"));
}
self.ff_info.seek(SeekFrom::Start(0))?;
let p = format!("{};{};{};", self.name, op_id, committed_op_id);
let mut hash = Hasher::new();
hash.update(p.as_bytes());
if let Err(e) = self
.ff_info
.write(format!("{}{:X}\n", p, hash.finalize()).as_bytes())
{
error!("fail put info push, set queue.ready = false, err={}", e);
self.is_ready = false;
return Err(e);
}
Ok(())
}
pub fn read_modified(&self) -> std::io::Result<SystemTime> {
self.ff_info.sync_data()?;
self.ff_info.metadata()?.modified()
}
pub fn read_info(&mut self) -> Option<(i64, i64)> {
let mut res = false;
let mut op_id = 0;
let mut committed_op_id = 0;
if self.ff_info.seek(SeekFrom::Start(0)).is_err() {
return None;
}
if let Some(line) = BufReader::new(&self.ff_info).lines().next() {
res = true;
if let Ok(ll) = line {
match scan_fmt!(&ll, "{};{};{};{}", String, i64, i64, String) {
Ok((q, oid, coid, _crc)) => {
if q != self.name {
res = false;
}
op_id = oid;
committed_op_id = coid;
}
Err(_) => res = false,
}
} else {
return None;
}
}
if res {
return Some((op_id, committed_op_id));
}
None
}
}