use crate::fs2::FileExt;
use crate::record::*;
use crc32fast::Hasher;
use std::fs::*;
use std::io::prelude::*;
use std::io::SeekFrom;
use std::io::{BufRead, BufReader};
use std::path::*;
pub struct Queue {
pub base_path: String,
mode: Mode,
pub is_ready: bool,
pub name: String,
pub ff_queue: File,
ff_info_push: File,
ff_info_queue: File,
_lock_file: Option<File>,
pub(crate) right_edge: u64,
pub count_pushed: u32,
pub id: u32,
}
impl Queue {
pub fn new(base_path: &str, queue_name: &str, in_mode: Mode) -> Result<Queue, ErrorQueue> {
if !Path::new(&base_path).exists() {
if let Err(e) = create_dir_all(base_path) {
error!("queue:{} create path, err={}", queue_name, e);
return Err(ErrorQueue::FailWrite);
}
}
let file_name_info_queue = base_path.to_owned() + "/" + queue_name + "_info_queue";
let wfqi = if in_mode == Mode::ReadWrite {
OpenOptions::new().read(true).write(true).create(true).open(file_name_info_queue)
} else {
OpenOptions::new().read(true).open(file_name_info_queue)
};
if let Ok(fqi) = wfqi {
let tmp_f1 = match fqi.try_clone() {
Ok(f) => f,
Err(e) => {
error!("queue:{} clone info_queue fd, err={}", queue_name, e);
return Err(ErrorQueue::FailOpen);
},
};
let tmp_f2 = match fqi.try_clone() {
Ok(f) => f,
Err(e) => {
error!("queue:{} clone info_push fd, err={}", queue_name, e);
return Err(ErrorQueue::FailOpen);
},
};
let mut queue = Queue {
base_path: base_path.to_string(),
mode: in_mode,
is_ready: true,
name: queue_name.to_owned(),
count_pushed: 0,
right_edge: 0,
ff_queue: fqi,
ff_info_queue: tmp_f1,
ff_info_push: tmp_f2,
_lock_file: None,
id: 0,
};
let info_is_ok = queue.get_info_queue();
if in_mode == Mode::ReadWrite {
let file_name_lock = queue.base_path.to_owned() + "/" + queue_name + "_queue.lock";
match OpenOptions::new().read(true).write(true).create(true).open(file_name_lock) {
Ok(file) => {
if let Err(e) = file.try_lock_exclusive() {
error!("queue:{}:{} attempt lock, err={}", queue.name, queue.id, e);
return Err(ErrorQueue::AlreadyOpen);
}
queue._lock_file = Some(file);
},
Err(e) => {
error!("queue:{}:{} prepare lock, err={}", queue.name, queue.id, e);
return Err(ErrorQueue::FailOpen);
},
}
if info_is_ok {
let previous_id = queue.id;
let previous_is_empty = match queue.get_info_of_part(previous_id, true) {
Ok(()) => queue.count_pushed == 0,
Err(_) => false,
};
if previous_is_empty {
queue.right_edge = 0;
} else {
queue.id = previous_id + 1;
queue.count_pushed = 0;
queue.right_edge = 0;
}
}
let part_name = queue.name.to_owned() + "-" + &queue.id.to_string();
if !Path::new(&part_name).exists() {
if let Err(e) = create_dir_all(queue.base_path.to_owned() + "/" + &part_name) {
error!("queue:{}:{} create path, err={}", queue.name, queue.id, e);
return Err(ErrorQueue::FailWrite);
}
}
if let Err(e) = queue.open_part(queue.id) {
error!("queue:{}:{} open part, err={:?}", queue.name, queue.id, e);
return Err(ErrorQueue::FailOpen);
}
if let Err(e) = queue.put_info_push() {
error!("queue:{}:{} open, write info of part, err={:?}", queue.name, queue.id, e);
return Err(ErrorQueue::FailWrite);
}
if let Err(e) = queue.put_info_queue() {
error!("queue:{}:{} open, write info of queue, err={:?}", queue.name, queue.id, e);
return Err(ErrorQueue::FailWrite);
}
}
if info_is_ok {
if let Err(e) = queue.get_info_of_part(queue.id, true) {
error!("queue:{}:{} open, get info of part: {}", queue.name, queue.id, e.as_str());
if in_mode == Mode::Read {
return Err(e);
}
}
} else if in_mode == Mode::Read {
error!("queue:{} open(Read), info_queue unreadable or CRC mismatch", queue.name);
return Err(ErrorQueue::InvalidChecksum);
}
return Ok(queue);
}
Err(ErrorQueue::NotReady)
}
pub fn push(&mut self, data: &[u8], in_msg_type: MsgType) -> Result<u64, ErrorQueue> {
if !self.is_ready || self.mode == Mode::Read || data.len() > std::u32::MAX as usize / 2 {
return Err(ErrorQueue::NotReady);
}
let header = Header {
start_pos: self.right_edge,
msg_length: data.len() as u32,
magic_marker: MAGIC_MARKER,
count_pushed: self.count_pushed + 1,
crc: 0,
msg_type: in_msg_type,
};
let mut bheader = [0; HEADER_SIZE];
header.to_buf(&mut bheader);
let mut hash = Hasher::new();
hash.update(&bheader);
hash.update(data);
let bhash = u32::to_ne_bytes(hash.finalize());
bheader[21] = bhash[0];
bheader[22] = bhash[1];
bheader[23] = bhash[2];
bheader[24] = bhash[3];
let restore_pos = self.right_edge;
if let Err(e) = self.ff_queue.write_all(&bheader) {
error!("queue:{}:{} push, write header, err={}", self.name, self.id, e);
self.rollback_queue_tail(restore_pos);
return Err(ErrorQueue::FailWrite);
}
if let Err(e) = self.ff_queue.write_all(data) {
error!("queue:{}:{} push, write body, err={}", self.name, self.id, e);
self.rollback_queue_tail(restore_pos);
return Err(ErrorQueue::FailWrite);
}
self.right_edge += bheader.len() as u64 + data.len() as u64;
self.count_pushed += 1;
if let Err(e) = self.put_info_push() {
error!("queue:{}:{} push, write info, err={:?}", self.name, self.id, e);
self.right_edge = restore_pos;
self.count_pushed -= 1;
self.rollback_queue_tail(restore_pos);
return Err(ErrorQueue::FailWrite);
}
Ok(self.right_edge)
}
fn rollback_queue_tail(&mut self, pos: u64) {
if let Err(e) = self.ff_queue.set_len(pos) {
warn!("queue:{}:{} rollback set_len({}), err={}", self.name, self.id, pos, e);
}
if let Err(e) = self.ff_queue.seek(SeekFrom::Start(pos)) {
warn!("queue:{}:{} rollback seek({}), err={}", self.name, self.id, pos, e);
}
}
fn put_info_push(&mut self) -> Result<(), ErrorQueue> {
let p = format!("{};{};{};", self.name, self.right_edge, self.count_pushed);
let mut hash = Hasher::new();
hash.update(p.as_bytes());
let payload = format!("{}{}\n", p, hash.finalize());
if let Err(e) = self.ff_info_push.set_len(0) {
error!("fail put info push, set_len err={}", e);
self.is_ready = false;
return Err(ErrorQueue::FailWrite);
}
if let Err(e) = self.ff_info_push.seek(SeekFrom::Start(0)) {
error!("fail put info push, seek err={}", e);
self.is_ready = false;
return Err(ErrorQueue::FailWrite);
}
if let Err(e) = self.ff_info_push.write_all(payload.as_bytes()) {
error!("fail put info push, write err={}", e);
self.is_ready = false;
return Err(ErrorQueue::FailWrite);
}
Ok(())
}
fn put_info_queue(&mut self) -> Result<(), ErrorQueue> {
let p = format!("{};{};", self.name, self.id);
let mut hash = Hasher::new();
hash.update(p.as_bytes());
let payload = format!("{}{}\n", p, hash.finalize());
if let Err(e) = self.ff_info_queue.set_len(0) {
error!("fail put info queue, set_len err={}", e);
self.is_ready = false;
return Err(ErrorQueue::FailWrite);
}
if let Err(e) = self.ff_info_queue.seek(SeekFrom::Start(0)) {
error!("fail put info queue, seek err={}", e);
self.is_ready = false;
return Err(ErrorQueue::FailWrite);
}
if let Err(e) = self.ff_info_queue.write_all(payload.as_bytes()) {
error!("fail put info queue, write err={}", e);
self.is_ready = false;
return Err(ErrorQueue::FailWrite);
}
Ok(())
}
fn open_info_push(&mut self, part_id: u32) -> Result<(), ErrorQueue> {
let ipp = self.base_path.to_owned() + "/" + &self.name + "-" + &part_id.to_string() + "/" + &self.name + "_info_push";
let ffiq = if self.mode == Mode::ReadWrite {
OpenOptions::new().read(true).write(true).create(true).open(&ipp)
} else {
OpenOptions::new().read(true).open(&ipp)
};
match ffiq {
Ok(ff) => self.ff_info_push = ff,
Err(e) => {
debug!("[{}] fail open info push, part {}, mode={:?}, err={:?} {}", self.name, part_id, self.mode, e, ipp);
self.is_ready = false;
return Err(ErrorQueue::FailOpen);
},
}
Ok(())
}
pub fn open_part(&mut self, part_id: u32) -> Result<(), ErrorQueue> {
if !self.is_ready {
return Err(ErrorQueue::NotReady);
}
self.open_info_push(part_id)?;
let qpp = self.base_path.to_owned() + "/" + &self.name + "-" + &part_id.to_string() + "/" + &self.name + "_queue";
let ffq = if self.mode == Mode::ReadWrite {
OpenOptions::new().read(true).write(true).create(true).open(qpp)
} else {
OpenOptions::new().read(true).open(qpp)
};
if let Ok(f) = ffq {
self.ff_queue = f;
} else {
debug!("[{}] fail open part {}", self.name, part_id);
self.is_ready = false;
return Err(ErrorQueue::FailOpen);
}
self.id = part_id;
debug!("[{}] open part {}", self.name, part_id);
self.get_info_of_part(self.id, false)
}
pub fn get_info_queue(&mut self) -> bool {
if self.ff_info_queue.seek(SeekFrom::Start(0)).is_err() {
return false;
}
let line = match BufReader::new(&self.ff_info_queue).lines().next() {
Some(Ok(s)) => s,
Some(Err(e)) => {
error!("queue:{} get_info_queue, read line err={}", self.name, e);
return false;
},
None => return false,
};
let (queue_name, parsed_id, parsed_crc) = match scan_fmt!(&line, "{};{};{}", String, u32, u32) {
Ok(v) => v,
Err(e) => {
error!("queue:{} get_info_queue, parse err={}, line=[{}]", self.name, e, line);
return false;
},
};
if queue_name != self.name {
return false;
}
let prefix = format!("{};{};", queue_name, parsed_id);
let mut hash = Hasher::new();
hash.update(prefix.as_bytes());
if hash.finalize() != parsed_crc {
error!("queue:{} get_info_queue, CRC mismatch", self.name);
return false;
}
self.id = parsed_id;
true
}
pub fn get_info_of_part(&mut self, part_id: u32, reopen: bool) -> Result<(), ErrorQueue> {
if self.id != part_id || reopen {
self.open_info_push(part_id)?;
}
if self.ff_info_push.seek(SeekFrom::Start(0)).is_err() {
return Err(ErrorQueue::FailRead);
}
let first_line = match BufReader::new(&self.ff_info_push).lines().next() {
Some(Ok(s)) => Some(s),
Some(Err(e)) => {
error!("queue:{}:{} get_info_of_part, read line err={}", self.name, part_id, e);
return Err(ErrorQueue::FailRead);
},
None => None,
};
let (right_edge, count_pushed) = match first_line {
None => (0u64, 0u32),
Some(ll) => {
let (queue_name, position, pushed, parsed_crc) = match scan_fmt!(&ll, "{};{};{};{}", String, u64, u32, u32) {
Ok(v) => v,
Err(e) => {
error!("queue:{}:{} get_info_of_part, parse err={}, line=[{}]", self.name, part_id, e, ll);
return Err(ErrorQueue::Other);
},
};
if queue_name != self.name {
return Err(ErrorQueue::Other);
}
let prefix = format!("{};{};{};", queue_name, position, pushed);
let mut hash = Hasher::new();
hash.update(prefix.as_bytes());
if hash.finalize() != parsed_crc {
error!("queue:{}:{} get_info_of_part, CRC mismatch", self.name, part_id);
return Err(ErrorQueue::InvalidChecksum);
}
(position, pushed)
},
};
self.right_edge = right_edge;
self.count_pushed = count_pushed;
Ok(())
}
}