use crate::info::ModuleInfo;
use crate::signals::sys_sig_listener;
use crate::api::IndvOp;
use chrono::Local;
use crossbeam_channel::{select, tick, Receiver};
use env_logger::Builder;
use ini::Ini;
use nng::options::protocol::pubsub::Subscribe;
use nng::options::Options;
use nng::options::RecvTimeout;
use nng::{Protocol, Socket};
use std::io::Write;
use std::str::FromStr;
use std::time::Duration;
use std::time::Instant;
use std::{env, thread, time};
use v_individual_model::onto::individual::{Individual, RawObj};
use v_individual_model::onto::parser::parse_raw;
use v_queue::consumer::*;
use v_queue::record::*;
#[derive(Debug)]
#[repr(u8)]
pub enum PrepareError {
Fatal = 101,
Recoverable = 102,
}
const NOTIFY_CHANNEL_RECONNECT_TIMEOUT: u64 = 300;
pub struct Module {
pub(crate) queue_prepared_count: i64,
notify_channel_url: String,
pub(crate) is_ready_notify_channel: bool,
notify_channel_read_timeout: Option<u64>,
pub(crate) max_timeout_between_batches: Option<u64>,
pub(crate) min_batch_size_to_cancel_timeout: Option<u32>,
pub max_batch_size: Option<u32>,
pub(crate) subsystem_id: Option<i64>,
pub(crate) syssig_ch: Option<Receiver<i32>>,
pub name: String,
pub onto_types: Vec<String>,
}
impl Default for Module {
fn default() -> Self {
Module::create(None, "")
}
}
impl Module {
pub fn new_with_name(name: &str) -> Self {
Module::create(None, name)
}
pub fn create(module_id: Option<i64>, module_name: &str) -> Self {
let args: Vec<String> = env::args().collect();
let mut notify_channel_url = String::default();
let mut max_timeout_between_batches = None;
let mut min_batch_size_to_cancel_timeout = None;
let mut max_batch_size = None;
let mut notify_channel_read_timeout = None;
for el in args.iter() {
if el.starts_with("--max_timeout_between_batches") {
let p: Vec<&str> = el.split('=').collect();
if let Ok(v) = p[1].parse::<u64>() {
max_timeout_between_batches = Some(v);
info!("use {} = {} ms", p[0], v);
}
} else if el.starts_with("--min_batch_size_to_cancel_timeout") {
let p: Vec<&str> = el.split('=').collect();
if let Ok(v) = p[1].parse::<u32>() {
min_batch_size_to_cancel_timeout = Some(v);
info!("use {} = {}", p[0], v);
}
} else if el.starts_with("--max_batch_size") {
let p: Vec<&str> = el.split('=').collect();
if let Ok(v) = p[1].parse::<u32>() {
max_batch_size = Some(v);
println!("use {} = {}", p[0], v);
}
} else if el.starts_with("--notify_channel_read_timeout") {
let p: Vec<&str> = el.split('=').collect();
if let Ok(v) = p[1].parse::<u64>() {
notify_channel_read_timeout = Some(v);
info!("use {} = {} ms", p[0], v);
}
} else if el.starts_with("--notify_channel_url") {
let p: Vec<&str> = el.split('=').collect();
notify_channel_url = p[1].to_owned();
}
}
if notify_channel_url.is_empty() {
if let Some(s) = Module::get_property("notify_channel_url") {
notify_channel_url = s;
}
}
let onto_types = vec![
"rdfs:Class",
"owl:Class",
"rdfs:Datatype",
"owl:Ontology",
"rdf:Property",
"owl:DatatypeProperty",
"owl:ObjectProperty",
"owl:OntologyProperty",
"owl:AnnotationProperty",
"v-ui:PropertySpecification",
"v-ui:DatatypePropertySpecification",
"v-ui:ObjectPropertySpecification",
"v-ui:TemplateSpecification",
"v-ui:ClassModel",
]
.into_iter()
.map(|s| s.to_string())
.collect();
Module {
queue_prepared_count: 0,
notify_channel_url,
is_ready_notify_channel: false,
max_timeout_between_batches,
min_batch_size_to_cancel_timeout,
max_batch_size,
subsystem_id: module_id,
notify_channel_read_timeout,
syssig_ch: None,
name: module_name.to_owned(),
onto_types,
}
}
pub fn new() -> Self {
Module::create(None, "")
}
pub fn is_content_onto(
&self,
cmd: IndvOp,
new_state: &mut Individual,
prev_state: &mut Individual,
) -> bool {
if cmd != IndvOp::Remove {
if new_state.any_exists_v("rdf:type", &self.onto_types) {
return true;
}
} else if prev_state.any_exists_v("rdf:type", &self.onto_types) {
return true;
}
false
}
pub fn get_property<T: FromStr>(in_param: &str) -> Option<T> {
let conf = Ini::load_from_file("veda.properties").expect("fail load veda.properties file");
let aliases = conf
.section(Some("alias"))
.expect("fail parse veda.properties, section [alias]");
let args: Vec<String> = env::args().collect();
let params = [in_param.replace('_', "-"), in_param.replace('-', "_")];
for el in args.iter() {
for param in ¶ms {
if el.starts_with(&format!("--{}", param)) {
let p: Vec<&str> = el.split('=').collect();
if p.len() == 2 {
let v = p[1].trim();
let val = if let Some(a) = aliases.get(v) {
info!("use arg --{}={}, alias={}", param, a, v);
a
} else {
info!("use arg --{}={}", param, v);
v
};
return val.parse().ok();
}
}
}
}
let section = conf.section(None::<String>).expect("fail parse veda.properties");
if let Some(v) = section.get(in_param) {
let mut val = v.trim().to_owned();
if val.starts_with('$') {
if let Ok(val4var) = env::var(val.strip_prefix('$').unwrap_or_default()) {
info!("get env variable [{}]", val);
val = val4var;
} else {
info!("not found env variable {}", val);
return None;
}
}
let res = if let Some(a) = aliases.get(&val) {
info!("use param [{}]={}, alias={}", in_param, a, val);
a
} else {
info!("use param [{}]={}", in_param, val);
&val
};
return res.parse().ok();
}
error!("param [{}] not found", in_param);
None
}
pub fn connect_to_notify_channel(&mut self) -> Option<Socket> {
if !self.is_ready_notify_channel && !self.notify_channel_url.is_empty() {
let soc = Socket::new(Protocol::Sub0).unwrap();
let timeout = if let Some(t) = self.notify_channel_read_timeout {
t
} else {
1000
};
if let Err(e) = soc.set_opt::<RecvTimeout>(Some(Duration::from_millis(timeout))) {
error!("fail set timeout, {} err={}", self.notify_channel_url, e);
return None;
}
if let Err(e) = soc.dial(&self.notify_channel_url) {
error!("fail connect to, {} err={}", self.notify_channel_url, e);
return None;
} else {
let all_topics = vec![];
if let Err(e) = soc.set_opt::<Subscribe>(all_topics) {
error!("fail subscribe, {} err={}", self.notify_channel_url, e);
soc.close();
self.is_ready_notify_channel = false;
return None;
} else {
info!(
"success subscribe on queue changes: {}",
self.notify_channel_url
);
self.is_ready_notify_channel = true;
return Some(soc);
}
}
}
None
}
pub fn listen_queue_raw<T, B>(
&mut self,
queue_consumer: &mut Consumer,
module_context: &mut T,
before_batch: &mut fn(&mut B, &mut T, batch_size: u32) -> Option<u32>,
prepare: &mut fn(&mut B, &mut T, &RawObj, &Consumer) -> Result<bool, PrepareError>,
after_batch: &mut fn(&mut B, &mut T, prepared_batch_size: u32) -> Result<bool, PrepareError>,
heartbeat: &mut fn(&mut B, &mut T) -> Result<(), PrepareError>,
backend: &mut B,
) {
self.listen_queue_comb(
queue_consumer,
module_context,
before_batch,
Some(prepare),
None,
after_batch,
heartbeat,
backend,
)
}
pub fn listen_queue<T, B>(
&mut self,
queue_consumer: &mut Consumer,
module_context: &mut T,
before_batch: &mut fn(&mut B, &mut T, batch_size: u32) -> Option<u32>,
prepare: &mut fn(&mut B, &mut T, &mut Individual, &Consumer) -> Result<bool, PrepareError>,
after_batch: &mut fn(&mut B, &mut T, prepared_batch_size: u32) -> Result<bool, PrepareError>,
heartbeat: &mut fn(&mut B, &mut T) -> Result<(), PrepareError>,
backend: &mut B,
) {
self.listen_queue_comb(
queue_consumer,
module_context,
before_batch,
None,
Some(prepare),
after_batch,
heartbeat,
backend,
)
}
fn listen_queue_comb<T, B>(
&mut self,
queue_consumer: &mut Consumer,
module_context: &mut T,
before_batch: &mut fn(&mut B, &mut T, batch_size: u32) -> Option<u32>,
prepare_raw: Option<&mut fn(&mut B, &mut T, &RawObj, &Consumer) -> Result<bool, PrepareError>>,
prepare_indv: Option<
&mut fn(&mut B, &mut T, &mut Individual, &Consumer) -> Result<bool, PrepareError>,
>,
after_batch: &mut fn(&mut B, &mut T, prepared_batch_size: u32) -> Result<bool, PrepareError>,
heartbeat: &mut fn(&mut B, &mut T) -> Result<(), PrepareError>,
backend: &mut B,
) {
if let Ok(ch) = sys_sig_listener() {
self.syssig_ch = Some(ch);
}
let mut soc = None;
let mut count_timeout_error = 0;
let mut prev_batch_time = Instant::now();
let update = tick(Duration::from_millis(1));
loop {
if let Some(qq) = &self.syssig_ch {
select! {
recv(update) -> _ => {
}
recv(qq) -> _ => {
info!("queue {}/{}, part:{}, pos:{}", queue_consumer.queue.base_path, queue_consumer.name, queue_consumer.id, queue_consumer.count_popped);
info!("Exit");
std::process::exit(exitcode::OK);
}
}
}
if let Err(PrepareError::Fatal) = heartbeat(backend, module_context) {
error!("heartbeat: found fatal error, stop listen queue");
break;
}
if soc.is_none() {
soc = self.connect_to_notify_channel();
if soc.is_none() {
thread::sleep(time::Duration::from_millis(NOTIFY_CHANNEL_RECONNECT_TIMEOUT));
info!("sleep {} ms", NOTIFY_CHANNEL_RECONNECT_TIMEOUT);
}
}
if let Err(e) = queue_consumer.queue.get_info_of_part(queue_consumer.id, true) {
error!(
"{} get_info_of_part {}: {}",
self.queue_prepared_count,
queue_consumer.id,
e.as_str()
);
continue;
}
let size_batch = queue_consumer.get_batch_size();
let mut max_size_batch = size_batch;
if let Some(m) = self.max_batch_size {
max_size_batch = m;
}
if size_batch > 0 {
debug!("queue: batch size={}", size_batch);
if let Some(new_size) = before_batch(backend, module_context, size_batch) {
max_size_batch = new_size;
}
}
let mut prepared_batch_size = 0;
for _it in 0..max_size_batch {
if !queue_consumer.pop_header() {
break;
}
let mut raw = RawObj::new(vec![0; queue_consumer.record_len()]);
if let Err(e) = queue_consumer.pop_body(&mut raw.data) {
match e {
ErrorQueue::FailReadTailMessage => {
break;
}
ErrorQueue::InvalidChecksum => {
error!("[module] consumer:pop_body: invalid CRC, attempt seek next record");
queue_consumer.seek_next_pos();
break;
}
_ => {
error!(
"{} get msg from queue: {}",
self.queue_prepared_count,
e.as_str()
);
break;
}
}
}
let mut need_commit = true;
if let Some(&mut f) = prepare_raw {
match f(backend, module_context, &raw, queue_consumer) {
Err(e) => {
if let PrepareError::Fatal = e {
warn!("prepare: found fatal error, stop listen queue");
return;
}
}
Ok(b) => {
need_commit = b;
}
}
}
if let Some(&mut f) = prepare_indv {
let mut queue_element = Individual::new_raw(raw);
if parse_raw(&mut queue_element).is_ok() {
let mut is_processed = true;
if let Some(assigned_subsystems) =
queue_element.get_first_integer("assigned_subsystems")
{
if assigned_subsystems > 0 {
if let Some(my_subsystem_id) = self.subsystem_id {
if assigned_subsystems & my_subsystem_id == 0 {
is_processed = false;
}
} else {
is_processed = false;
}
}
}
if is_processed {
match f(backend, module_context, &mut queue_element, queue_consumer) {
Err(e) => {
if let PrepareError::Fatal = e {
warn!("prepare: found fatal error, stop listen queue");
return;
}
}
Ok(b) => {
need_commit = b;
}
}
}
}
}
if need_commit {
queue_consumer.commit();
}
self.queue_prepared_count += 1;
if self.queue_prepared_count % 1000 == 0 {
info!("get from queue, count: {}", self.queue_prepared_count);
}
prepared_batch_size += 1;
}
if size_batch > 0 {
match after_batch(backend, module_context, prepared_batch_size) {
Ok(b) => {
if b {
queue_consumer.commit();
}
}
Err(e) => {
if let PrepareError::Fatal = e {
warn!("after_batch: found fatal error, stop listen queue");
return;
}
}
}
}
if prepared_batch_size == size_batch {
if let Some(s) = &soc {
let wmsg = s.recv();
if let Err(e) = wmsg {
debug!("fail recv from queue notify channel, err={:?}", e);
if count_timeout_error > 0 && size_batch > 0 {
warn!("queue changed but we not received notify message, need reconnect...");
self.is_ready_notify_channel = false;
count_timeout_error += 1;
}
} else {
count_timeout_error = 0;
}
}
}
if let Some(t) = self.max_timeout_between_batches {
let delta = prev_batch_time.elapsed().as_millis() as u64;
if let Some(c) = self.min_batch_size_to_cancel_timeout {
if prepared_batch_size < c && delta < t {
thread::sleep(time::Duration::from_millis(t - delta));
info!("sleep {} ms", t - delta);
}
} else if delta < t {
thread::sleep(time::Duration::from_millis(t - delta));
info!("sleep {} ms", t - delta);
}
}
prev_batch_time = Instant::now();
}
}
}
pub fn get_inner_binobj_as_individual<'a>(
queue_element: &'a mut Individual,
field_name: &str,
new_indv: &'a mut Individual,
) -> bool {
let binobj = queue_element.get_first_binobj(field_name);
if binobj.is_some() {
new_indv.set_raw(&binobj.unwrap_or_default());
if parse_raw(new_indv).is_ok() {
return true;
}
}
false
}
pub fn get_cmd(queue_element: &mut Individual) -> Option<IndvOp> {
let wcmd = queue_element.get_first_integer("cmd");
wcmd?;
Some(IndvOp::from_i64(wcmd.unwrap_or_default()))
}
pub fn init_log(module_name: &str) {
init_log_with_filter(module_name, None)
}
pub fn init_log_with_filter(module_name: &str, filter: Option<&str>) {
init_log_with_params(module_name, filter, false);
}
pub fn init_log_with_params(module_name: &str, filter: Option<&str>, with_thread_id: bool) {
let var_log_name = module_name.to_owned() + "_LOG";
let filters_str = match env::var(&var_log_name) {
Ok(val) if !val.is_empty() => {
println!("use env var: {}: {:?}", var_log_name, val);
val
}
_ => filter.unwrap_or("info").to_owned(),
};
if with_thread_id {
Builder::new()
.format(|buf, record| {
writeln!(
buf,
"{} {} [{}] - {}",
thread_id::get(),
Local::now().format("%Y-%m-%dT%H:%M:%S%.3f"),
record.level(),
record.args()
)
})
.parse_filters(&filters_str)
.try_init()
.unwrap_or(())
} else {
Builder::new()
.format(|buf, record| {
writeln!(
buf,
"{} [{}] - {}",
Local::now().format("%Y-%m-%dT%H:%M:%S%.3f"),
record.level(),
record.args()
)
})
.parse_filters(&filters_str)
.try_init()
.unwrap_or(())
}
}
pub fn get_info_of_module(module_name: &str) -> Option<(i64, i64)> {
let module_info = ModuleInfo::new("./data", module_name, false);
if module_info.is_err() {
error!(
"fail open info of [{}], err={:?}",
module_name,
module_info.err()
);
return None;
}
let mut info = module_info.unwrap();
info.read_info()
}