use crate::compress::PartDictionary;
use crate::queue::*;
use crate::record::*;
use crc32fast::Hasher;
use fs2::FileExt;
use std::cmp::Ordering;
use std::fs::*;
use std::io::prelude::*;
use std::io::SeekFrom;
use std::io::{BufRead, BufReader};
use std::path::Path;
pub struct Consumer {
mode: Mode,
pub name: String,
pub queue: Queue,
pub count_popped: u32,
pub id: u32,
is_ready: bool,
pos_record: u64,
ff_info_pop: File,
_lock_file: Option<File>,
base_path: String,
uncompressed_len: u32,
part_dict: PartDictionary,
pub header: Header,
hash: Hasher,
}
impl Drop for Consumer {
fn drop(&mut self) {
if self.mode != Mode::Read {
let _ = self._lock_file.take();
let info_name_lock = self.base_path.to_owned() + "/" + &self.queue.name + "_info_pop_" + &self.name + ".lock";
if let Err(e) = remove_file(&info_name_lock) {
error!("[queue:consumer] drop: queue:{}:{}:{}, fail remove lock file {}, err={:?}", self.queue.name, self.queue.id, self.name, &info_name_lock, e);
}
}
}
}
impl Consumer {
pub fn new(base_path: &str, consumer_name: &str, queue_name: &str) -> Result<Consumer, ErrorQueue> {
Consumer::new_with_mode(base_path, consumer_name, queue_name, Mode::ReadWrite)
}
pub fn new_with_mode(base_path: &str, consumer_name: &str, queue_name: &str, mode: Mode) -> Result<Consumer, ErrorQueue> {
let info_name = format!("{}/{}_info_pop_{}", base_path, queue_name, consumer_name);
let exists = Path::new(&info_name).exists();
match Queue::new(base_path, queue_name, Mode::Read) {
Ok(mut q) => {
if !q.get_info_queue() {
return Err(ErrorQueue::NotReady);
}
let lock_file_opt = if mode == Mode::ReadWrite {
let info_name_lock = base_path.to_owned() + "/" + queue_name + "_info_pop_" + consumer_name + ".lock";
match OpenOptions::new().read(true).write(true).create(true).open(info_name_lock) {
Ok(file) => {
if let Err(e) = file.try_lock_exclusive() {
error!("consumer:{} attempt lock, err={}", consumer_name, e);
return Err(ErrorQueue::AlreadyOpen);
}
Some(file)
},
Err(e) => {
error!("consumer:{} prepare lock, err={}", consumer_name, e);
return Err(ErrorQueue::FailOpen);
},
}
} else {
None
};
let open_with_option = if mode == Mode::ReadWrite {
OpenOptions::new().read(true).write(true).create(true).open(&info_name)
} else {
OpenOptions::new().read(true).open(&info_name)
};
match open_with_option {
Ok(ff) => {
let mut consumer = Consumer {
mode,
is_ready: true,
name: consumer_name.to_owned(),
ff_info_pop: ff,
_lock_file: lock_file_opt,
queue: q,
count_popped: 0,
pos_record: 0,
uncompressed_len: 0,
part_dict: PartDictionary::new(),
hash: Hasher::new(),
header: Header {
start_pos: 0,
msg_length: 0,
magic_marker: 0,
count_pushed: 0,
crc: 0,
msg_type: MsgType::String,
compressed: false,
},
base_path: base_path.to_string(),
id: 0,
};
if exists && consumer.get_info() {
if consumer.queue.open_part(consumer.id).is_ok() {
if consumer.queue.ff_queue.seek(SeekFrom::Start(consumer.pos_record)).is_err() {
return Err(ErrorQueue::NotReady);
}
}
} else {
consumer.id = consumer.queue.id;
consumer.pos_record = 0;
consumer.count_popped = 0;
if consumer.queue.open_part(consumer.id).is_err() {
return Err(ErrorQueue::NotReady);
}
consumer.open(true);
if !consumer.commit() {
return Err(ErrorQueue::NotReady);
}
}
Ok(consumer)
},
Err(_e) => Err(ErrorQueue::NotReady),
}
},
Err(_e) => Err(ErrorQueue::NotReady),
}
}
pub fn get_batch_size(&mut self) -> u32 {
self.get_batch_size_l(0)
}
fn get_batch_size_l(&mut self, level: u16) -> u32 {
if self.queue.count_pushed < self.count_popped {
return 0;
}
let delta = self.queue.count_pushed - self.count_popped;
match delta.cmp(&0) {
Ordering::Equal => {
if self.queue.id == self.id {
let _ = self.queue.get_info_of_part(self.id, false);
if self.queue.count_pushed > self.count_popped {
return self.queue.count_pushed - self.count_popped;
}
}
self.queue.get_info_queue();
if self.queue.id > self.id {
if level == 0 && self.go_to_next_part() {
return self.get_batch_size_l(level + 1);
}
return 0;
}
},
Ordering::Greater => {
return if self.queue.id != self.id {
if level == 0 && self.go_to_next_part() {
return self.get_batch_size_l(level + 1);
}
0
} else {
delta
}
},
_ => {},
}
0
}
pub fn open(&mut self, is_new: bool) -> bool {
if !self.queue.is_ready {
error!("[queue:consumer] open: queue not ready, set consumer.ready = false");
self.is_ready = false;
return false;
}
let info_pop_file_name = self.queue.base_path.to_owned() + "/" + &self.queue.name + "_info_pop_" + &self.name;
let open_with_option = if self.mode == Mode::ReadWrite {
OpenOptions::new().read(true).write(true).truncate(true).create(is_new).open(&info_pop_file_name)
} else {
OpenOptions::new().read(true).open(&info_pop_file_name)
};
if let Ok(ff) = open_with_option {
self.ff_info_pop = ff;
} else {
error!("[queue:consumer] open: fail open file [{}], set consumer.ready = false", info_pop_file_name);
self.is_ready = false;
return false;
}
true
}
pub fn get_info(&mut self) -> bool {
let mut res = true;
if self.ff_info_pop.seek(SeekFrom::Start(0)).is_err() {
return false;
}
if let Some(line) = BufReader::new(&self.ff_info_pop).lines().next() {
if let Ok(ll) = line {
if let Ok((queue_name, consumer_name, position, count_popped, id)) = scan_fmt!(&ll, "{};{};{};{};{}", String, String, u64, u32, u32) {
if queue_name != self.queue.name {
res = false;
}
if consumer_name != self.name {
res = false;
}
self.pos_record = position;
self.count_popped = count_popped;
self.id = id;
} else {
res = false;
}
} else {
return false;
}
}
debug!("[queue:consumer] ({}): count_pushed:{}, position:{}, id:{}, success:{}", self.name, self.count_popped, self.pos_record, self.id, res);
res
}
pub fn pop_header(&mut self) -> bool {
let res = self.read_header();
if !res {
self.sync_and_set_cur_pos();
}
res
}
pub fn go_to_next_part(&mut self) -> bool {
if self.count_popped >= self.queue.count_pushed {
if let Err(e) = self.queue.get_info_of_part(self.id, false) {
error!("{}, queue:consumer({}):pop, queue {}{} not ready", e.as_str(), self.name, self.queue.name, self.id);
return false;
}
}
if self.count_popped >= self.queue.count_pushed {
if self.queue.id == self.id {
self.queue.get_info_queue();
}
if self.queue.id > self.id {
while self.id < self.queue.id {
self.id += 1;
debug!("prepare next part {}", self.id);
if let Err(e) = self.queue.get_info_of_part(self.id, false) {
if e == ErrorQueue::NotFound {
warn!("queue:consumer({}):pop, queue {}:{} {}", self.name, self.queue.name, self.id, e.as_str());
} else {
error!("queue:consumer({}):pop, queue {}:{} {}", self.name, self.queue.name, self.id, e.as_str());
return false;
}
} else {
warn!("use next part {}", self.id);
break;
}
}
self.count_popped = 0;
self.pos_record = 0;
self.open(true);
self.commit();
if let Err(e) = self.queue.open_part(self.id) {
error!("queue:consumer({}):pop, queue {}:{}, open part: {}", self.name, self.queue.name, self.id, e.as_str());
}
return true;
}
}
false
}
fn is_empty_part(&mut self) -> bool {
self.queue.count_pushed == 0
}
fn read_header(&mut self) -> bool {
if self.go_to_next_part() {
while self.is_empty_part() {
if !self.go_to_next_part() {
break;
}
}
}
let expected_magic = match self.queue.part_format {
FormatVersion::V3 => MAGIC_MARKER_V3,
FormatVersion::V2 => MAGIC_MARKER,
};
let mut buf = vec![0; HEADER_SIZE];
match self.queue.ff_queue.read(&mut buf[..]) {
Ok(len) => {
if len < HEADER_SIZE {
return false;
}
},
Err(_) => {
error!("[queue:consumer] fail read message header");
return false;
},
}
let header = Header::create_from_buf(&buf);
if header.magic_marker != expected_magic {
error!("[queue:consumer] header is invalid: magic marker mismatch at pos={}", self.pos_record);
self.seek_next_pos();
return false;
}
if header.count_pushed > self.queue.count_pushed {
error!("[queue:consumer] header is invalid: record header count_pushed {} > queue count pushed {}", header.count_pushed, self.queue.count_pushed);
return false;
}
if header.start_pos >= self.queue.right_edge {
error!("[queue:consumer] header is invalid: start_pos {} >= right_edge {}", header.start_pos, self.queue.right_edge);
return false;
}
let msg_end = header.start_pos
.saturating_add(HEADER_SIZE as u64)
.saturating_add(header.msg_length as u64);
if msg_end > self.queue.right_edge || header.msg_length as usize > std::u32::MAX as usize / 2 {
error!(
"[queue:consumer] header is invalid: msg_length {} would overshoot right_edge {} (start_pos={})",
header.msg_length, self.queue.right_edge, header.start_pos
);
return false;
}
buf[21] = 0;
buf[22] = 0;
buf[23] = 0;
buf[24] = 0;
self.hash = Hasher::new();
self.hash.update(&buf[..]);
if header.compressed {
if header.msg_length < 4 {
error!("[queue:consumer] compressed record too short, msg_length={}", header.msg_length);
return false;
}
let mut lb = [0u8; 4];
match self.queue.ff_queue.read(&mut lb) {
Ok(4) => {},
_ => return false,
}
self.hash.update(&lb);
let orig = u32::from_ne_bytes(lb);
if orig as usize > std::u32::MAX as usize / 2 {
error!("[queue:consumer] compressed record has invalid original length {}", orig);
return false;
}
self.uncompressed_len = orig;
} else {
self.uncompressed_len = header.msg_length;
}
self.header = header;
true
}
pub fn record_len(&self) -> usize {
self.uncompressed_len as usize
}
pub fn seek_next_pos(&mut self) -> bool {
let from = self.header.start_pos + HEADER_SIZE as u64;
warn!("[queue:consumer] abnormal situation: seek next record, from pos={}", from);
if let Err(e) = self.queue.ff_queue.seek(SeekFrom::Start(from)) {
error!("[queue:consumer] fail seek in queue, err={:?}", e);
return false;
}
let mut buf = vec![0; 65536];
let read_len = match self.queue.ff_queue.read(&mut buf[..]) {
Ok(len) => len,
Err(_) => {
error!("[queue:consumer] seek_next_pos: fail to read queue");
return false;
},
};
const MARKER_OFFSET_IN_HEADER: u64 = 12;
let marker = match self.queue.part_format {
FormatVersion::V3 => MAGIC_MARKER_V3_BYTES,
FormatVersion::V2 => MAGIC_MARKER_BYTES,
};
let mut sbi: usize = 0;
for (idx, b) in buf[..read_len].iter().enumerate() {
if *b == marker[sbi] {
sbi += 1;
if sbi >= marker.len() {
let absolute_match_end = from + idx as u64;
let header_start = match absolute_match_end
.checked_add(1)
.and_then(|v| v.checked_sub(MAGIC_MARKER_BYTES.len() as u64))
.and_then(|v| v.checked_sub(MARKER_OFFSET_IN_HEADER))
{
Some(p) => p,
None => {
warn!("[queue:consumer] seek_next_pos: underflow at idx={}", idx);
return false;
},
};
if let Err(e) = self.queue.ff_queue.seek(SeekFrom::Start(header_start)) {
error!("[queue:consumer] fail seek in queue, err={:?}", e);
return false;
}
warn!(
"[queue:consumer] next record pos={}, delta={}",
header_start,
header_start.saturating_sub(self.header.start_pos)
);
self.pos_record = header_start;
self.is_ready = true;
self.count_popped += 1;
return true;
}
} else if *b == marker[0] {
sbi = 1;
} else {
sbi = 0;
}
}
false
}
fn sync_and_set_cur_pos(&mut self) {
if let Err(e) = self.queue.ff_queue.sync_data() {
error!("[queue:consumer] fail sync data, err={:?}", e);
}
if let Err(e) = self.queue.ff_queue.seek(SeekFrom::Start(self.pos_record)) {
error!("[queue:consumer] fail seek in queue, err={:?}", e);
}
}
pub fn pop_body(&mut self, msg: &mut [u8]) -> Result<usize, ErrorQueue> {
if !self.is_ready {
return Err(ErrorQueue::NotReady);
}
let record_start = self.pos_record;
if self.header.compressed {
return self.pop_body_compressed(msg, record_start);
}
let readied_size = match self.queue.ff_queue.read(msg) {
Ok(n) => n,
Err(_) => {
error!("[queue:consumer] fail read message body");
return Err(ErrorQueue::FailRead);
},
};
if readied_size != msg.len() {
if self.count_popped == self.queue.count_pushed {
warn!("[queue:consumer] detected problem with 'Read Tail Message': size fail");
let _ = self.queue.ff_queue.seek(SeekFrom::Start(record_start));
return Err(ErrorQueue::FailReadTailMessage);
}
return Err(ErrorQueue::FailRead);
}
self.hash.update(msg);
let crc32: u32 = self.hash.clone().finalize();
if crc32 != self.header.crc {
if self.count_popped == self.queue.count_pushed {
warn!("[queue:consumer] detected problem with 'Read Tail Message': CRC fail");
let _ = self.queue.ff_queue.seek(SeekFrom::Start(record_start));
return Err(ErrorQueue::FailReadTailMessage);
}
error!("[queue:consumer] CRC fail, pos={}, record size={}", self.header.start_pos, self.header.msg_length + HEADER_SIZE as u32);
self.is_ready = false;
return Err(ErrorQueue::InvalidChecksum);
}
self.pos_record = record_start + HEADER_SIZE as u64 + readied_size as u64;
self.count_popped += 1;
Ok(readied_size)
}
fn pop_body_compressed(&mut self, msg: &mut [u8], record_start: u64) -> Result<usize, ErrorQueue> {
let stored = self.header.msg_length as usize - 4;
let mut cbuf = vec![0u8; stored];
let readied = match self.queue.ff_queue.read(&mut cbuf) {
Ok(n) => n,
Err(_) => {
error!("[queue:consumer] fail read compressed body");
return Err(ErrorQueue::FailRead);
},
};
if readied != stored {
if self.count_popped == self.queue.count_pushed {
warn!("[queue:consumer] detected problem with 'Read Tail Message': size fail");
let _ = self.queue.ff_queue.seek(SeekFrom::Start(record_start));
return Err(ErrorQueue::FailReadTailMessage);
}
return Err(ErrorQueue::FailRead);
}
self.hash.update(&cbuf);
let crc32: u32 = self.hash.clone().finalize();
if crc32 != self.header.crc {
if self.count_popped == self.queue.count_pushed {
warn!("[queue:consumer] detected problem with 'Read Tail Message': CRC fail");
let _ = self.queue.ff_queue.seek(SeekFrom::Start(record_start));
return Err(ErrorQueue::FailReadTailMessage);
}
error!("[queue:consumer] CRC fail, pos={}, record size={}", self.header.start_pos, self.header.msg_length + HEADER_SIZE as u32);
self.is_ready = false;
return Err(ErrorQueue::InvalidChecksum);
}
if !self.part_dict.ensure_loaded(&self.base_path, &self.queue.name, self.queue.id) {
error!("[queue:consumer] compressed record but dictionary unavailable, pos={}", record_start);
self.is_ready = false;
return Err(ErrorQueue::InvalidChecksum);
}
let expected = msg.len();
let written = match self.part_dict.decompress(&cbuf, msg) {
Ok(n) => n,
Err(()) => {
error!("[queue:consumer] decompress failed, pos={}", record_start);
self.is_ready = false;
return Err(ErrorQueue::InvalidChecksum);
},
};
if written != expected {
error!("[queue:consumer] decompressed size {} != expected {}", written, expected);
self.is_ready = false;
return Err(ErrorQueue::InvalidChecksum);
}
self.pos_record = record_start + HEADER_SIZE as u64 + 4 + stored as u64;
self.count_popped += 1;
Ok(written)
}
pub fn commit(&mut self) -> bool {
let payload = format!("{};{};{};{};{}\n", self.queue.name, self.name, self.pos_record, self.count_popped, self.id);
if let Err(e) = self.ff_info_pop.set_len(0) {
error!("[queue:consumer] commit set_len err={}", e);
self.is_ready = false;
return false;
}
if let Err(e) = self.ff_info_pop.seek(SeekFrom::Start(0)) {
error!("[queue:consumer] commit seek err={}", e);
self.is_ready = false;
return false;
}
if let Err(e) = self.ff_info_pop.write_all(payload.as_bytes()) {
error!("[queue:consumer] commit write err={}", e);
self.is_ready = false;
return false;
}
self.is_ready
}
}