1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214
use crate::module::common::sys_sig_listener;
use crate::module::module_impl::{init_log, Module, PrepareError};
use crate::onto::individual::{Individual, RawObj};
use crate::onto::parser::parse_raw;
use crossbeam_channel::{select, tick};
use nng::{Protocol, Socket};
use std::time::{Duration, Instant};
use std::{thread, time};
use v_queue::consumer::Consumer;
use v_queue::record::ErrorQueue;
pub trait VedaQueueModule {
fn before_batch(&mut self, size_batch: u32) -> Option<u32>;
fn prepare(&mut self, queue_element: &mut Individual) -> Result<bool, PrepareError>;
fn after_batch(&mut self, prepared_batch_size: u32) -> Result<bool, PrepareError>;
fn heartbeat(&mut self) -> Result<(), PrepareError>;
fn before_start(&mut self);
fn before_exit(&mut self);
}
impl Module {
pub fn prepare_queue(&mut self, veda_module: &mut dyn VedaQueueModule) {
init_log(&self.name);
let queue_consumer = &mut Consumer::new("./data/queue", &self.name, "individuals-flow").expect("!!!!!!!!! FAIL QUEUE");
if let Ok(ch) = sys_sig_listener() {
self.syssig_ch = Some(ch);
}
let mut soc = Socket::new(Protocol::Sub0).unwrap();
let mut count_timeout_error = 0;
let mut prev_batch_time = Instant::now();
let update = tick(Duration::from_millis(1));
veda_module.before_start();
loop {
if let Some(qq) = &self.syssig_ch {
select! {
recv(update) -> _ => {
}
recv(qq) -> _ => {
info!("Exit");
veda_module.before_exit();
std::process::exit (exitcode::OK);
//break;
}
}
}
if let Err(PrepareError::Fatal) = veda_module.heartbeat() {
error!("heartbeat: found fatal error, stop listen queue");
break;
}
if let Some(s) = self.connect_to_notify_channel() {
soc = s;
}
// read queue current part info
if let Err(e) = queue_consumer.queue.get_info_of_part(queue_consumer.id, true) {
error!("{} get_info_of_part {}: {}", self.queue_prepared_count, queue_consumer.id, e.as_str());
continue;
}
let size_batch = queue_consumer.get_batch_size();
let mut max_size_batch = size_batch;
if let Some(m) = self.max_batch_size {
max_size_batch = m;
}
if size_batch > 0 {
debug!("queue: batch size={}", size_batch);
if let Some(new_size) = veda_module.before_batch(size_batch) {
max_size_batch = new_size;
}
}
let mut prepared_batch_size = 0;
for _it in 0..max_size_batch {
// пробуем взять из очереди заголовок сообщения
if !queue_consumer.pop_header() {
break;
}
let mut raw = RawObj::new(vec![0; (queue_consumer.header.msg_length) as usize]);
// заголовок взят успешно, занесем содержимое сообщения в структуру Individual
if let Err(e) = queue_consumer.pop_body(&mut raw.data) {
match e {
ErrorQueue::FailReadTailMessage => {
break;
},
ErrorQueue::InvalidChecksum => {
error!("[module] consumer:pop_body: invalid CRC, attempt seek next record");
queue_consumer.seek_next_pos();
break;
},
_ => {
error!("{} get msg from queue: {}", self.queue_prepared_count, e.as_str());
break;
},
}
}
let mut need_commit = true;
/*
if let Some(&mut f) = prepare_raw {
match f(self, module, &raw, queue_consumer) {
Err(e) => {
if let PrepareError::Fatal = e {
warn!("prepare: found fatal error, stop listen queue");
return;
}
}
Ok(b) => {
need_commit = b;
}
}
}
*/
{
let mut queue_element = Individual::new_raw(raw);
if parse_raw(&mut queue_element).is_ok() {
let mut is_processed = true;
if let Some(assigned_subsystems) = queue_element.get_first_integer("assigned_subsystems") {
if assigned_subsystems > 0 {
if let Some(my_subsystem_id) = self.subsystem_id {
if assigned_subsystems & my_subsystem_id == 0 {
is_processed = false;
}
} else {
is_processed = false;
}
}
}
if is_processed {
match veda_module.prepare(&mut queue_element) {
Err(e) => {
if let PrepareError::Fatal = e {
warn!("prepare: found fatal error, stop listen queue");
return;
}
},
Ok(b) => {
need_commit = b;
},
}
}
}
}
if need_commit {
queue_consumer.commit();
}
self.queue_prepared_count += 1;
if self.queue_prepared_count % 1000 == 0 {
info!("get from queue, count: {}", self.queue_prepared_count);
}
prepared_batch_size += 1;
}
if size_batch > 0 {
match veda_module.after_batch(prepared_batch_size) {
Ok(b) => {
if b {
queue_consumer.commit();
}
},
Err(e) => {
if let PrepareError::Fatal = e {
warn!("after_batch: found fatal error, stop listen queue");
return;
}
},
}
}
if prepared_batch_size == size_batch {
let wmsg = soc.recv();
if let Err(e) = wmsg {
debug!("fail recv from queue notify channel, err={:?}", e);
if count_timeout_error > 0 && size_batch > 0 {
warn!("queue changed but we not received notify message, need reconnect...");
self.is_ready_notify_channel = false;
count_timeout_error += 1;
}
} else {
count_timeout_error = 0;
}
}
if let Some(t) = self.max_timeout_between_batches {
let delta = prev_batch_time.elapsed().as_millis() as u64;
if let Some(c) = self.min_batch_size_to_cancel_timeout {
if prepared_batch_size < c && delta < t {
thread::sleep(time::Duration::from_millis(t - delta));
info!("sleep {} ms", t - delta);
}
} else if delta < t {
thread::sleep(time::Duration::from_millis(t - delta));
info!("sleep {} ms", t - delta);
}
}
prev_batch_time = Instant::now();
}
}
}