1use crate::module::common::sys_sig_listener;
2use crate::module::info::ModuleInfo;
3use crate::module::veda_backend::Backend;
4use v_individual_model::onto::individual::{Individual, RawObj};
5use v_individual_model::onto::parser::parse_raw;
6use v_storage::{StorageId, VStorage};
7use crate::v_api::api_client::IndvOp;
8use chrono::Local;
9use crossbeam_channel::{select, tick, Receiver};
10use env_logger::Builder;
11use ini::Ini;
12use nng::options::protocol::pubsub::Subscribe;
13use nng::options::Options;
14use nng::options::RecvTimeout;
15use nng::{Protocol, Socket};
16use std::io::Write;
17use std::str::FromStr;
18use std::time::Duration;
19use std::time::Instant;
20use std::{env, thread, time};
21use v_queue::{consumer::*, record::*};
22
23#[derive(Debug)]
24#[repr(u8)]
25pub enum PrepareError {
26 Fatal = 101,
27 Recoverable = 102,
28}
29
30const NOTIFY_CHANNEL_RECONNECT_TIMEOUT: u64 = 300;
31
32pub struct Module {
33 pub(crate) queue_prepared_count: i64,
34 notify_channel_url: String,
35 pub(crate) is_ready_notify_channel: bool,
36 notify_channel_read_timeout: Option<u64>,
37 pub(crate) max_timeout_between_batches: Option<u64>,
38 pub(crate) min_batch_size_to_cancel_timeout: Option<u32>,
39 pub max_batch_size: Option<u32>,
40 pub(crate) subsystem_id: Option<i64>,
41 pub(crate) syssig_ch: Option<Receiver<i32>>,
42 pub(crate) name: String,
43 onto_types: Vec<String>,
44}
45
46impl Default for Module {
47 fn default() -> Self {
48 Module::create(None, "")
49 }
50}
51
52impl Module {
53 pub fn new_with_name(name: &str) -> Self {
54 Module::create(None, name)
55 }
56
57 pub fn create(module_id: Option<i64>, module_name: &str) -> Self {
58 let args: Vec<String> = env::args().collect();
59
60 let mut notify_channel_url = String::default();
61 let mut max_timeout_between_batches = None;
62 let mut min_batch_size_to_cancel_timeout = None;
63 let mut max_batch_size = None;
64 let mut notify_channel_read_timeout = None;
65
66 for el in args.iter() {
67 if el.starts_with("--max_timeout_between_batches") {
68 let p: Vec<&str> = el.split('=').collect();
69 if let Ok(v) = p[1].parse::<u64>() {
70 max_timeout_between_batches = Some(v);
71 info!("use {} = {} ms", p[0], v);
72 }
73 } else if el.starts_with("--min_batch_size_to_cancel_timeout") {
74 let p: Vec<&str> = el.split('=').collect();
75 if let Ok(v) = p[1].parse::<u32>() {
76 min_batch_size_to_cancel_timeout = Some(v);
77 info!("use {} = {}", p[0], v);
78 }
79 } else if el.starts_with("--max_batch_size") {
80 let p: Vec<&str> = el.split('=').collect();
81 if let Ok(v) = p[1].parse::<u32>() {
82 max_batch_size = Some(v);
83 println!("use {} = {}", p[0], v);
84 }
85 } else if el.starts_with("--notify_channel_read_timeout") {
86 let p: Vec<&str> = el.split('=').collect();
87 if let Ok(v) = p[1].parse::<u64>() {
88 notify_channel_read_timeout = Some(v);
89 info!("use {} = {} ms", p[0], v);
90 }
91 } else if el.starts_with("--notify_channel_url") {
92 let p: Vec<&str> = el.split('=').collect();
93 notify_channel_url = p[1].to_owned();
94 }
95 }
96
97 if notify_channel_url.is_empty() {
98 if let Some(s) = Module::get_property("notify_channel_url") {
99 notify_channel_url = s
100 }
101 }
102
103 let onto_types = vec![
104 "rdfs:Class",
105 "owl:Class",
106 "rdfs:Datatype",
107 "owl:Ontology",
108 "rdf:Property",
109 "owl:DatatypeProperty",
110 "owl:ObjectProperty",
111 "owl:OntologyProperty",
112 "owl:AnnotationProperty",
113 "v-ui:PropertySpecification",
114 "v-ui:DatatypePropertySpecification",
115 "v-ui:ObjectPropertySpecification",
116 "v-ui:TemplateSpecification",
117 "v-ui:ClassModel",
118 ];
119
120 Module {
121 queue_prepared_count: 0,
122 notify_channel_url,
123 is_ready_notify_channel: false,
124 max_timeout_between_batches,
125 min_batch_size_to_cancel_timeout,
126 max_batch_size,
127 subsystem_id: module_id,
128 notify_channel_read_timeout,
129 syssig_ch: None,
130 name: module_name.to_owned(),
131 onto_types: onto_types.iter().map(|x| x.to_string()).collect(),
132 }
133 }
134
135 pub fn new() -> Self {
136 Module::create(None, "")
137 }
138
139 pub fn get_property<T: FromStr>(in_param: &str) -> Option<T> {
142 let conf = Ini::load_from_file("veda.properties").expect("fail load veda.properties file");
144
145 let aliases = conf.section(Some("alias")).expect("fail parse veda.properties, section [alias]");
147
148 let args: Vec<String> = env::args().collect();
150
151 let params = [in_param.replace('_', "-"), in_param.replace('-', "_")];
152
153 for el in args.iter() {
155 for param in ¶ms {
156 if el.starts_with(&format!("--{}", param)) {
157 let p: Vec<&str> = el.split('=').collect();
159
160 if p.len() == 2 {
162 let v = p[1].trim();
163 let val = if let Some(a) = aliases.get(v) {
164 info!("use arg --{}={}, alias={}", param, a, v);
165 a
166 } else {
167 info!("use arg --{}={}", param, v);
168 v
169 };
170
171 return val.parse().ok();
172 }
173 }
174 }
175 }
176
177 let section = conf.section(None::<String>).expect("fail parse veda.properties");
179
180 if let Some(v) = section.get(in_param) {
181 let mut val = v.trim().to_owned();
183
184 if val.starts_with('$') {
186 if let Ok(val4var) = env::var(val.strip_prefix('$').unwrap_or_default()) {
187 info!("get env variable [{}]", val);
188 val = val4var;
189 } else {
190 info!("not found env variable {}", val);
191 return None;
192 }
193 }
194
195 let res = if let Some(a) = aliases.get(&val) {
197 info!("use param [{}]={}, alias={}", in_param, a, val);
198 a
199 } else {
200 info!("use param [{}]={}", in_param, val);
201 &val
202 };
203
204 return res.parse().ok();
206 }
207
208 error!("param [{}] not found", in_param);
210 None
211 }
212
213 pub fn is_content_onto(&self, cmd: IndvOp, new_state: &mut Individual, prev_state: &mut Individual) -> bool {
214 if cmd != IndvOp::Remove {
215 if new_state.any_exists_v("rdf:type", &self.onto_types) {
216 return true;
217 }
218 } else if prev_state.any_exists_v("rdf:type", &self.onto_types) {
219 return true;
220 }
221 false
222 }
223
224 pub fn get_sys_ticket_id_from_db(storage: &mut VStorage) -> Result<String, i32> {
225 let mut indv = Individual::default();
226 if storage.get_individual_from_storage(StorageId::Tickets, "systicket", &mut indv).is_ok() {
227 if let Some(c) = indv.get_first_literal("v-s:resource") {
228 return Ok(c);
229 }
230 }
231 Err(-1)
232 }
233
234 pub(crate) fn connect_to_notify_channel(&mut self) -> Option<Socket> {
235 if !self.is_ready_notify_channel && !self.notify_channel_url.is_empty() {
236 let soc = Socket::new(Protocol::Sub0).unwrap();
237
238 let timeout = if let Some(t) = self.notify_channel_read_timeout {
239 t
240 } else {
241 1000
242 };
243
244 if let Err(e) = soc.set_opt::<RecvTimeout>(Some(Duration::from_millis(timeout))) {
245 error!("fail set timeout, {} err={}", self.notify_channel_url, e);
246 return None;
247 }
248
249 if let Err(e) = soc.dial(&self.notify_channel_url) {
250 error!("fail connect to, {} err={}", self.notify_channel_url, e);
251 return None;
252 } else {
253 let all_topics = vec![];
254 if let Err(e) = soc.set_opt::<Subscribe>(all_topics) {
255 error!("fail subscribe, {} err={}", self.notify_channel_url, e);
256 soc.close();
257 self.is_ready_notify_channel = false;
258 return None;
259 } else {
260 info!("success subscribe on queue changes: {}", self.notify_channel_url);
261 self.is_ready_notify_channel = true;
262 return Some(soc);
263 }
264 }
265 }
266 None
267 }
268
269 pub fn listen_queue_raw<T>(
270 &mut self,
271 queue_consumer: &mut Consumer,
272 module_context: &mut T,
273 before_batch: &mut fn(&mut Backend, &mut T, batch_size: u32) -> Option<u32>,
274 prepare: &mut fn(&mut Backend, &mut T, &RawObj, &Consumer) -> Result<bool, PrepareError>,
275 after_batch: &mut fn(&mut Backend, &mut T, prepared_batch_size: u32) -> Result<bool, PrepareError>,
276 heartbeat: &mut fn(&mut Backend, &mut T) -> Result<(), PrepareError>,
277 backend: &mut Backend,
278 ) {
279 self.listen_queue_comb(queue_consumer, module_context, before_batch, Some(prepare), None, after_batch, heartbeat, backend)
280 }
281
282 pub fn listen_queue<T>(
283 &mut self,
284 queue_consumer: &mut Consumer,
285 module_context: &mut T,
286 before_batch: &mut fn(&mut Backend, &mut T, batch_size: u32) -> Option<u32>,
287 prepare: &mut fn(&mut Backend, &mut T, &mut Individual, &Consumer) -> Result<bool, PrepareError>,
288 after_batch: &mut fn(&mut Backend, &mut T, prepared_batch_size: u32) -> Result<bool, PrepareError>,
289 heartbeat: &mut fn(&mut Backend, &mut T) -> Result<(), PrepareError>,
290 backend: &mut Backend,
291 ) {
292 self.listen_queue_comb(queue_consumer, module_context, before_batch, None, Some(prepare), after_batch, heartbeat, backend)
293 }
294
295 fn listen_queue_comb<T>(
296 &mut self,
297 queue_consumer: &mut Consumer,
298 module_context: &mut T,
299 before_batch: &mut fn(&mut Backend, &mut T, batch_size: u32) -> Option<u32>,
300 prepare_raw: Option<&mut fn(&mut Backend, &mut T, &RawObj, &Consumer) -> Result<bool, PrepareError>>,
301 prepare_indv: Option<&mut fn(&mut Backend, &mut T, &mut Individual, &Consumer) -> Result<bool, PrepareError>>,
302 after_batch: &mut fn(&mut Backend, &mut T, prepared_batch_size: u32) -> Result<bool, PrepareError>,
303 heartbeat: &mut fn(&mut Backend, &mut T) -> Result<(), PrepareError>,
304 backend: &mut Backend,
305 ) {
306 if let Ok(ch) = sys_sig_listener() {
307 self.syssig_ch = Some(ch);
308 }
309
310 let mut soc = None;
311 let mut count_timeout_error = 0;
312
313 let mut prev_batch_time = Instant::now();
314 let update = tick(Duration::from_millis(1));
315 loop {
316 if let Some(qq) = &self.syssig_ch {
317 select! {
318 recv(update) -> _ => {
319 }
320 recv(qq) -> _ => {
321 info!("queue {}/{}, part:{}, pos:{}", queue_consumer.queue.base_path, queue_consumer.name, queue_consumer.id, queue_consumer.count_popped);
322 info!("Exit");
323 std::process::exit (exitcode::OK);
324 }
326 }
327 }
328
329 if let Err(PrepareError::Fatal) = heartbeat(backend, module_context) {
330 error!("heartbeat: found fatal error, stop listen queue");
331 break;
332 }
333
334 if soc.is_none() {
335 soc = self.connect_to_notify_channel();
336 if soc.is_none() {
337 thread::sleep(time::Duration::from_millis(NOTIFY_CHANNEL_RECONNECT_TIMEOUT));
338 info!("sleep {} ms", NOTIFY_CHANNEL_RECONNECT_TIMEOUT);
339 }
340 }
341
342 if let Err(e) = queue_consumer.queue.get_info_of_part(queue_consumer.id, true) {
344 error!("{} get_info_of_part {}: {}", self.queue_prepared_count, queue_consumer.id, e.as_str());
345 continue;
346 }
347
348 let size_batch = queue_consumer.get_batch_size();
349
350 let mut max_size_batch = size_batch;
351 if let Some(m) = self.max_batch_size {
352 max_size_batch = m;
353 }
354
355 if size_batch > 0 {
356 debug!("queue: batch size={}", size_batch);
357 if let Some(new_size) = before_batch(backend, module_context, size_batch) {
358 max_size_batch = new_size;
359 }
360 }
361
362 let mut prepared_batch_size = 0;
363 for _it in 0..max_size_batch {
364 if !queue_consumer.pop_header() {
366 break;
367 }
368
369 let mut raw = RawObj::new(vec![0; (queue_consumer.header.msg_length) as usize]);
370
371 if let Err(e) = queue_consumer.pop_body(&mut raw.data) {
373 match e {
374 ErrorQueue::FailReadTailMessage => {
375 break;
376 },
377 ErrorQueue::InvalidChecksum => {
378 error!("[module] consumer:pop_body: invalid CRC, attempt seek next record");
379 queue_consumer.seek_next_pos();
380 break;
381 },
382 _ => {
383 error!("{} get msg from queue: {}", self.queue_prepared_count, e.as_str());
384 break;
385 },
386 }
387 }
388
389 let mut need_commit = true;
390
391 if let Some(&mut f) = prepare_raw {
392 match f(backend, module_context, &raw, queue_consumer) {
393 Err(e) => {
394 if let PrepareError::Fatal = e {
395 warn!("prepare: found fatal error, stop listen queue");
396 return;
397 }
398 },
399 Ok(b) => {
400 need_commit = b;
401 },
402 }
403 }
404
405 if let Some(&mut f) = prepare_indv {
406 let mut queue_element = Individual::new_raw(raw);
407 if parse_raw(&mut queue_element).is_ok() {
408 let mut is_processed = true;
409 if let Some(assigned_subsystems) = queue_element.get_first_integer("assigned_subsystems") {
410 if assigned_subsystems > 0 {
411 if let Some(my_subsystem_id) = self.subsystem_id {
412 if assigned_subsystems & my_subsystem_id == 0 {
413 is_processed = false;
414 }
415 } else {
416 is_processed = false;
417 }
418 }
419 }
420
421 if is_processed {
422 match f(backend, module_context, &mut queue_element, queue_consumer) {
423 Err(e) => {
424 if let PrepareError::Fatal = e {
425 warn!("prepare: found fatal error, stop listen queue");
426 return;
427 }
428 },
429 Ok(b) => {
430 need_commit = b;
431 },
432 }
433 }
434 }
435 }
436
437 if need_commit {
438 queue_consumer.commit();
439 }
440
441 self.queue_prepared_count += 1;
442
443 if self.queue_prepared_count % 1000 == 0 {
444 info!("get from queue, count: {}", self.queue_prepared_count);
445 }
446 prepared_batch_size += 1;
447 }
448
449 if size_batch > 0 {
450 match after_batch(backend, module_context, prepared_batch_size) {
451 Ok(b) => {
452 if b {
453 queue_consumer.commit();
454 }
455 },
456 Err(e) => {
457 if let PrepareError::Fatal = e {
458 warn!("after_batch: found fatal error, stop listen queue");
459 return;
460 }
461 },
462 }
463 }
464
465 if prepared_batch_size == size_batch {
466 if let Some(s) = &soc {
467 let wmsg = s.recv();
468 if let Err(e) = wmsg {
469 debug!("fail recv from queue notify channel, err={:?}", e);
470
471 if count_timeout_error > 0 && size_batch > 0 {
472 warn!("queue changed but we not received notify message, need reconnect...");
473 self.is_ready_notify_channel = false;
474 count_timeout_error += 1;
475 }
476 } else {
477 count_timeout_error = 0;
478 }
479 }
480 }
481
482 if let Some(t) = self.max_timeout_between_batches {
483 let delta = prev_batch_time.elapsed().as_millis() as u64;
484 if let Some(c) = self.min_batch_size_to_cancel_timeout {
485 if prepared_batch_size < c && delta < t {
486 thread::sleep(time::Duration::from_millis(t - delta));
487 info!("sleep {} ms", t - delta);
488 }
489 } else if delta < t {
490 thread::sleep(time::Duration::from_millis(t - delta));
491 info!("sleep {} ms", t - delta);
492 }
493 }
494
495 prev_batch_time = Instant::now();
496 }
497 }
498}
499
500pub fn get_inner_binobj_as_individual<'a>(queue_element: &'a mut Individual, field_name: &str, new_indv: &'a mut Individual) -> bool {
501 let binobj = queue_element.get_first_binobj(field_name);
502 if binobj.is_some() {
503 new_indv.set_raw(&binobj.unwrap_or_default());
504 if parse_raw(new_indv).is_ok() {
505 return true;
506 }
507 }
508 false
509}
510
511pub fn get_cmd(queue_element: &mut Individual) -> Option<IndvOp> {
512 let wcmd = queue_element.get_first_integer("cmd");
513 wcmd?;
514
515 Some(IndvOp::from_i64(wcmd.unwrap_or_default()))
516}
517
518pub fn init_log(module_name: &str) {
519 init_log_with_filter(module_name, None)
520}
521
522pub fn init_log_with_filter(module_name: &str, filter: Option<&str>) {
523 init_log_with_params(module_name, filter, false);
524}
525
526pub fn init_log_with_params(module_name: &str, filter: Option<&str>, with_thread_id: bool) {
527 let var_log_name = module_name.to_owned() + "_LOG";
528 match std::env::var_os(&var_log_name) {
529 Some(val) => println!("use env var: {}: {:?}", var_log_name, val.to_str()),
530 None => std::env::set_var(&var_log_name, "info"),
531 }
532
533 let filters_str = if let Some(f) = filter {
534 f.to_owned()
535 } else {
536 env::var(var_log_name).unwrap_or_default()
537 };
538
539 if with_thread_id {
540 Builder::new()
541 .format(|buf, record| {
542 writeln!(buf, "{} {} [{}] - {}", thread_id::get(), Local::now().format("%Y-%m-%dT%H:%M:%S%.3f"), record.level(), record.args())
543 })
544 .parse_filters(&filters_str)
545 .try_init()
546 .unwrap_or(())
547 } else {
548 Builder::new()
549 .format(|buf, record| writeln!(buf, "{} [{}] - {}", Local::now().format("%Y-%m-%dT%H:%M:%S%.3f"), record.level(), record.args()))
550 .parse_filters(&filters_str)
551 .try_init()
552 .unwrap_or(())
553 }
554}
555
556pub fn get_info_of_module(module_name: &str) -> Option<(i64, i64)> {
557 let module_info = ModuleInfo::new("./data", module_name, false);
558 if module_info.is_err() {
559 error!("fail open info of [{}], err={:?}", module_name, module_info.err());
560 return None;
561 }
562
563 let mut info = module_info.unwrap();
564 info.read_info()
565}
566
567pub fn wait_load_ontology() -> i64 {
568 wait_module("input-onto", 1)
569}
570
571pub fn wait_module(module_name: &str, wait_op_id: i64) -> i64 {
572 if wait_op_id < 0 {
573 error!("wait module [{}] to complete op_id={}", module_name, wait_op_id);
574 return -1;
575 }
576
577 info!("wait module [{}] to complete op_id={}", module_name, wait_op_id);
578 loop {
579 let module_info = ModuleInfo::new("./data", module_name, false);
580 if module_info.is_err() {
581 error!("fail open info of [{}], err={:?}", module_name, module_info.err());
582 thread::sleep(time::Duration::from_millis(300));
583 continue;
584 }
585
586 let mut info = module_info.unwrap();
587 loop {
588 if let Some((_, committed)) = info.read_info() {
589 if committed >= wait_op_id {
590 info!("wait module [{}] to complete op_id={}, found commited_op_id={}", module_name, wait_op_id, committed);
591 return committed;
592 }
593 } else {
594 error!("fail read info for module [{}]", module_name);
595 }
597 thread::sleep(time::Duration::from_millis(300));
598 }
599
600 }
602
603 }