1use crate::info::ModuleInfo;
2use crate::signals::sys_sig_listener;
3use crate::api::IndvOp;
4use chrono::Local;
5use crossbeam_channel::{select, tick, Receiver};
6use env_logger::Builder;
7use ini::Ini;
8use nng::options::protocol::pubsub::Subscribe;
9use nng::options::Options;
10use nng::options::RecvTimeout;
11use nng::{Protocol, Socket};
12use std::io::Write;
13use std::str::FromStr;
14use std::time::Duration;
15use std::time::Instant;
16use std::{env, thread, time};
17use v_individual_model::onto::individual::{Individual, RawObj};
18use v_individual_model::onto::parser::parse_raw;
19use v_queue::consumer::*;
20use v_queue::record::*;
21
22#[derive(Debug)]
23#[repr(u8)]
24pub enum PrepareError {
25 Fatal = 101,
26 Recoverable = 102,
27}
28
29const NOTIFY_CHANNEL_RECONNECT_TIMEOUT: u64 = 300;
30
31pub struct Module {
32 pub(crate) queue_prepared_count: i64,
33 notify_channel_url: String,
34 pub(crate) is_ready_notify_channel: bool,
35 notify_channel_read_timeout: Option<u64>,
36 pub(crate) max_timeout_between_batches: Option<u64>,
37 pub(crate) min_batch_size_to_cancel_timeout: Option<u32>,
38 pub max_batch_size: Option<u32>,
39 pub(crate) subsystem_id: Option<i64>,
40 pub(crate) syssig_ch: Option<Receiver<i32>>,
41 pub name: String,
42 pub onto_types: Vec<String>,
43}
44
45impl Default for Module {
46 fn default() -> Self {
47 Module::create(None, "")
48 }
49}
50
51impl Module {
52 pub fn new_with_name(name: &str) -> Self {
53 Module::create(None, name)
54 }
55
56 pub fn create(module_id: Option<i64>, module_name: &str) -> Self {
57 let args: Vec<String> = env::args().collect();
58
59 let mut notify_channel_url = String::default();
60 let mut max_timeout_between_batches = None;
61 let mut min_batch_size_to_cancel_timeout = None;
62 let mut max_batch_size = None;
63 let mut notify_channel_read_timeout = None;
64
65 for el in args.iter() {
66 if el.starts_with("--max_timeout_between_batches") {
67 let p: Vec<&str> = el.split('=').collect();
68 if let Ok(v) = p[1].parse::<u64>() {
69 max_timeout_between_batches = Some(v);
70 info!("use {} = {} ms", p[0], v);
71 }
72 } else if el.starts_with("--min_batch_size_to_cancel_timeout") {
73 let p: Vec<&str> = el.split('=').collect();
74 if let Ok(v) = p[1].parse::<u32>() {
75 min_batch_size_to_cancel_timeout = Some(v);
76 info!("use {} = {}", p[0], v);
77 }
78 } else if el.starts_with("--max_batch_size") {
79 let p: Vec<&str> = el.split('=').collect();
80 if let Ok(v) = p[1].parse::<u32>() {
81 max_batch_size = Some(v);
82 println!("use {} = {}", p[0], v);
83 }
84 } else if el.starts_with("--notify_channel_read_timeout") {
85 let p: Vec<&str> = el.split('=').collect();
86 if let Ok(v) = p[1].parse::<u64>() {
87 notify_channel_read_timeout = Some(v);
88 info!("use {} = {} ms", p[0], v);
89 }
90 } else if el.starts_with("--notify_channel_url") {
91 let p: Vec<&str> = el.split('=').collect();
92 notify_channel_url = p[1].to_owned();
93 }
94 }
95
96 if notify_channel_url.is_empty() {
97 if let Some(s) = Module::get_property("notify_channel_url") {
98 notify_channel_url = s;
99 }
100 }
101
102 let onto_types = vec![
103 "rdfs:Class",
104 "owl:Class",
105 "rdfs:Datatype",
106 "owl:Ontology",
107 "rdf:Property",
108 "owl:DatatypeProperty",
109 "owl:ObjectProperty",
110 "owl:OntologyProperty",
111 "owl:AnnotationProperty",
112 "v-ui:PropertySpecification",
113 "v-ui:DatatypePropertySpecification",
114 "v-ui:ObjectPropertySpecification",
115 "v-ui:TemplateSpecification",
116 "v-ui:ClassModel",
117 ]
118 .into_iter()
119 .map(|s| s.to_string())
120 .collect();
121
122 Module {
123 queue_prepared_count: 0,
124 notify_channel_url,
125 is_ready_notify_channel: false,
126 max_timeout_between_batches,
127 min_batch_size_to_cancel_timeout,
128 max_batch_size,
129 subsystem_id: module_id,
130 notify_channel_read_timeout,
131 syssig_ch: None,
132 name: module_name.to_owned(),
133 onto_types,
134 }
135 }
136
137 pub fn new() -> Self {
138 Module::create(None, "")
139 }
140
141 pub fn is_content_onto(
142 &self,
143 cmd: IndvOp,
144 new_state: &mut Individual,
145 prev_state: &mut Individual,
146 ) -> bool {
147 if cmd != IndvOp::Remove {
148 if new_state.any_exists_v("rdf:type", &self.onto_types) {
149 return true;
150 }
151 } else if prev_state.any_exists_v("rdf:type", &self.onto_types) {
152 return true;
153 }
154 false
155 }
156
157 pub fn get_property<T: FromStr>(in_param: &str) -> Option<T> {
158 let conf = Ini::load_from_file("veda.properties").expect("fail load veda.properties file");
159
160 let aliases = conf
161 .section(Some("alias"))
162 .expect("fail parse veda.properties, section [alias]");
163
164 let args: Vec<String> = env::args().collect();
165
166 let params = [in_param.replace('_', "-"), in_param.replace('-', "_")];
167
168 for el in args.iter() {
169 for param in ¶ms {
170 if el.starts_with(&format!("--{}", param)) {
171 let p: Vec<&str> = el.split('=').collect();
172
173 if p.len() == 2 {
174 let v = p[1].trim();
175 let val = if let Some(a) = aliases.get(v) {
176 info!("use arg --{}={}, alias={}", param, a, v);
177 a
178 } else {
179 info!("use arg --{}={}", param, v);
180 v
181 };
182
183 return val.parse().ok();
184 }
185 }
186 }
187 }
188
189 let section = conf.section(None::<String>).expect("fail parse veda.properties");
190
191 if let Some(v) = section.get(in_param) {
192 let mut val = v.trim().to_owned();
193
194 if val.starts_with('$') {
195 if let Ok(val4var) = env::var(val.strip_prefix('$').unwrap_or_default()) {
196 info!("get env variable [{}]", val);
197 val = val4var;
198 } else {
199 info!("not found env variable {}", val);
200 return None;
201 }
202 }
203
204 let res = if let Some(a) = aliases.get(&val) {
205 info!("use param [{}]={}, alias={}", in_param, a, val);
206 a
207 } else {
208 info!("use param [{}]={}", in_param, val);
209 &val
210 };
211
212 return res.parse().ok();
213 }
214
215 error!("param [{}] not found", in_param);
216 None
217 }
218
219 pub fn connect_to_notify_channel(&mut self) -> Option<Socket> {
220 if !self.is_ready_notify_channel && !self.notify_channel_url.is_empty() {
221 let soc = Socket::new(Protocol::Sub0).unwrap();
222
223 let timeout = if let Some(t) = self.notify_channel_read_timeout {
224 t
225 } else {
226 1000
227 };
228
229 if let Err(e) = soc.set_opt::<RecvTimeout>(Some(Duration::from_millis(timeout))) {
230 error!("fail set timeout, {} err={}", self.notify_channel_url, e);
231 return None;
232 }
233
234 if let Err(e) = soc.dial(&self.notify_channel_url) {
235 error!("fail connect to, {} err={}", self.notify_channel_url, e);
236 return None;
237 } else {
238 let all_topics = vec![];
239 if let Err(e) = soc.set_opt::<Subscribe>(all_topics) {
240 error!("fail subscribe, {} err={}", self.notify_channel_url, e);
241 soc.close();
242 self.is_ready_notify_channel = false;
243 return None;
244 } else {
245 info!(
246 "success subscribe on queue changes: {}",
247 self.notify_channel_url
248 );
249 self.is_ready_notify_channel = true;
250 return Some(soc);
251 }
252 }
253 }
254 None
255 }
256
257 pub fn listen_queue_raw<T, B>(
258 &mut self,
259 queue_consumer: &mut Consumer,
260 module_context: &mut T,
261 before_batch: &mut fn(&mut B, &mut T, batch_size: u32) -> Option<u32>,
262 prepare: &mut fn(&mut B, &mut T, &RawObj, &Consumer) -> Result<bool, PrepareError>,
263 after_batch: &mut fn(&mut B, &mut T, prepared_batch_size: u32) -> Result<bool, PrepareError>,
264 heartbeat: &mut fn(&mut B, &mut T) -> Result<(), PrepareError>,
265 backend: &mut B,
266 ) {
267 self.listen_queue_comb(
268 queue_consumer,
269 module_context,
270 before_batch,
271 Some(prepare),
272 None,
273 after_batch,
274 heartbeat,
275 backend,
276 )
277 }
278
279 pub fn listen_queue<T, B>(
280 &mut self,
281 queue_consumer: &mut Consumer,
282 module_context: &mut T,
283 before_batch: &mut fn(&mut B, &mut T, batch_size: u32) -> Option<u32>,
284 prepare: &mut fn(&mut B, &mut T, &mut Individual, &Consumer) -> Result<bool, PrepareError>,
285 after_batch: &mut fn(&mut B, &mut T, prepared_batch_size: u32) -> Result<bool, PrepareError>,
286 heartbeat: &mut fn(&mut B, &mut T) -> Result<(), PrepareError>,
287 backend: &mut B,
288 ) {
289 self.listen_queue_comb(
290 queue_consumer,
291 module_context,
292 before_batch,
293 None,
294 Some(prepare),
295 after_batch,
296 heartbeat,
297 backend,
298 )
299 }
300
301 fn listen_queue_comb<T, B>(
302 &mut self,
303 queue_consumer: &mut Consumer,
304 module_context: &mut T,
305 before_batch: &mut fn(&mut B, &mut T, batch_size: u32) -> Option<u32>,
306 prepare_raw: Option<&mut fn(&mut B, &mut T, &RawObj, &Consumer) -> Result<bool, PrepareError>>,
307 prepare_indv: Option<
308 &mut fn(&mut B, &mut T, &mut Individual, &Consumer) -> Result<bool, PrepareError>,
309 >,
310 after_batch: &mut fn(&mut B, &mut T, prepared_batch_size: u32) -> Result<bool, PrepareError>,
311 heartbeat: &mut fn(&mut B, &mut T) -> Result<(), PrepareError>,
312 backend: &mut B,
313 ) {
314 if let Ok(ch) = sys_sig_listener() {
315 self.syssig_ch = Some(ch);
316 }
317
318 let mut soc = None;
319 let mut count_timeout_error = 0;
320
321 let mut prev_batch_time = Instant::now();
322 let update = tick(Duration::from_millis(1));
323 loop {
324 if let Some(qq) = &self.syssig_ch {
325 select! {
326 recv(update) -> _ => {
327 }
328 recv(qq) -> _ => {
329 info!("queue {}/{}, part:{}, pos:{}", queue_consumer.queue.base_path, queue_consumer.name, queue_consumer.id, queue_consumer.count_popped);
330 info!("Exit");
331 std::process::exit(exitcode::OK);
332 }
333 }
334 }
335
336 if let Err(PrepareError::Fatal) = heartbeat(backend, module_context) {
337 error!("heartbeat: found fatal error, stop listen queue");
338 break;
339 }
340
341 if soc.is_none() {
342 soc = self.connect_to_notify_channel();
343 if soc.is_none() {
344 thread::sleep(time::Duration::from_millis(NOTIFY_CHANNEL_RECONNECT_TIMEOUT));
345 info!("sleep {} ms", NOTIFY_CHANNEL_RECONNECT_TIMEOUT);
346 }
347 }
348
349 if let Err(e) = queue_consumer.queue.get_info_of_part(queue_consumer.id, true) {
350 error!(
351 "{} get_info_of_part {}: {}",
352 self.queue_prepared_count,
353 queue_consumer.id,
354 e.as_str()
355 );
356 continue;
357 }
358
359 let size_batch = queue_consumer.get_batch_size();
360
361 let mut max_size_batch = size_batch;
362 if let Some(m) = self.max_batch_size {
363 max_size_batch = m;
364 }
365
366 if size_batch > 0 {
367 debug!("queue: batch size={}", size_batch);
368 if let Some(new_size) = before_batch(backend, module_context, size_batch) {
369 max_size_batch = new_size;
370 }
371 }
372
373 let mut prepared_batch_size = 0;
374 for _it in 0..max_size_batch {
375 if !queue_consumer.pop_header() {
376 break;
377 }
378
379 let mut raw = RawObj::new(vec![0; (queue_consumer.header.msg_length) as usize]);
380
381 if let Err(e) = queue_consumer.pop_body(&mut raw.data) {
382 match e {
383 ErrorQueue::FailReadTailMessage => {
384 break;
385 }
386 ErrorQueue::InvalidChecksum => {
387 error!("[module] consumer:pop_body: invalid CRC, attempt seek next record");
388 queue_consumer.seek_next_pos();
389 break;
390 }
391 _ => {
392 error!(
393 "{} get msg from queue: {}",
394 self.queue_prepared_count,
395 e.as_str()
396 );
397 break;
398 }
399 }
400 }
401
402 let mut need_commit = true;
403
404 if let Some(&mut f) = prepare_raw {
405 match f(backend, module_context, &raw, queue_consumer) {
406 Err(e) => {
407 if let PrepareError::Fatal = e {
408 warn!("prepare: found fatal error, stop listen queue");
409 return;
410 }
411 }
412 Ok(b) => {
413 need_commit = b;
414 }
415 }
416 }
417
418 if let Some(&mut f) = prepare_indv {
419 let mut queue_element = Individual::new_raw(raw);
420 if parse_raw(&mut queue_element).is_ok() {
421 let mut is_processed = true;
422 if let Some(assigned_subsystems) =
423 queue_element.get_first_integer("assigned_subsystems")
424 {
425 if assigned_subsystems > 0 {
426 if let Some(my_subsystem_id) = self.subsystem_id {
427 if assigned_subsystems & my_subsystem_id == 0 {
428 is_processed = false;
429 }
430 } else {
431 is_processed = false;
432 }
433 }
434 }
435
436 if is_processed {
437 match f(backend, module_context, &mut queue_element, queue_consumer) {
438 Err(e) => {
439 if let PrepareError::Fatal = e {
440 warn!("prepare: found fatal error, stop listen queue");
441 return;
442 }
443 }
444 Ok(b) => {
445 need_commit = b;
446 }
447 }
448 }
449 }
450 }
451
452 if need_commit {
453 queue_consumer.commit();
454 }
455
456 self.queue_prepared_count += 1;
457
458 if self.queue_prepared_count % 1000 == 0 {
459 info!("get from queue, count: {}", self.queue_prepared_count);
460 }
461 prepared_batch_size += 1;
462 }
463
464 if size_batch > 0 {
465 match after_batch(backend, module_context, prepared_batch_size) {
466 Ok(b) => {
467 if b {
468 queue_consumer.commit();
469 }
470 }
471 Err(e) => {
472 if let PrepareError::Fatal = e {
473 warn!("after_batch: found fatal error, stop listen queue");
474 return;
475 }
476 }
477 }
478 }
479
480 if prepared_batch_size == size_batch {
481 if let Some(s) = &soc {
482 let wmsg = s.recv();
483 if let Err(e) = wmsg {
484 debug!("fail recv from queue notify channel, err={:?}", e);
485
486 if count_timeout_error > 0 && size_batch > 0 {
487 warn!("queue changed but we not received notify message, need reconnect...");
488 self.is_ready_notify_channel = false;
489 count_timeout_error += 1;
490 }
491 } else {
492 count_timeout_error = 0;
493 }
494 }
495 }
496
497 if let Some(t) = self.max_timeout_between_batches {
498 let delta = prev_batch_time.elapsed().as_millis() as u64;
499 if let Some(c) = self.min_batch_size_to_cancel_timeout {
500 if prepared_batch_size < c && delta < t {
501 thread::sleep(time::Duration::from_millis(t - delta));
502 info!("sleep {} ms", t - delta);
503 }
504 } else if delta < t {
505 thread::sleep(time::Duration::from_millis(t - delta));
506 info!("sleep {} ms", t - delta);
507 }
508 }
509
510 prev_batch_time = Instant::now();
511 }
512 }
513}
514
515pub fn get_inner_binobj_as_individual<'a>(
516 queue_element: &'a mut Individual,
517 field_name: &str,
518 new_indv: &'a mut Individual,
519) -> bool {
520 let binobj = queue_element.get_first_binobj(field_name);
521 if binobj.is_some() {
522 new_indv.set_raw(&binobj.unwrap_or_default());
523 if parse_raw(new_indv).is_ok() {
524 return true;
525 }
526 }
527 false
528}
529
530pub fn get_cmd(queue_element: &mut Individual) -> Option<IndvOp> {
531 let wcmd = queue_element.get_first_integer("cmd");
532 wcmd?;
533
534 Some(IndvOp::from_i64(wcmd.unwrap_or_default()))
535}
536
537pub fn init_log(module_name: &str) {
538 init_log_with_filter(module_name, None)
539}
540
541pub fn init_log_with_filter(module_name: &str, filter: Option<&str>) {
542 init_log_with_params(module_name, filter, false);
543}
544
545pub fn init_log_with_params(module_name: &str, filter: Option<&str>, with_thread_id: bool) {
546 let var_log_name = module_name.to_owned() + "_LOG";
547
548 let filters_str = match env::var(&var_log_name) {
549 Ok(val) if !val.is_empty() => {
550 println!("use env var: {}: {:?}", var_log_name, val);
551 val
552 }
553 _ => filter.unwrap_or("info").to_owned(),
554 };
555
556 if with_thread_id {
557 Builder::new()
558 .format(|buf, record| {
559 writeln!(
560 buf,
561 "{} {} [{}] - {}",
562 thread_id::get(),
563 Local::now().format("%Y-%m-%dT%H:%M:%S%.3f"),
564 record.level(),
565 record.args()
566 )
567 })
568 .parse_filters(&filters_str)
569 .try_init()
570 .unwrap_or(())
571 } else {
572 Builder::new()
573 .format(|buf, record| {
574 writeln!(
575 buf,
576 "{} [{}] - {}",
577 Local::now().format("%Y-%m-%dT%H:%M:%S%.3f"),
578 record.level(),
579 record.args()
580 )
581 })
582 .parse_filters(&filters_str)
583 .try_init()
584 .unwrap_or(())
585 }
586}
587
588pub fn get_info_of_module(module_name: &str) -> Option<(i64, i64)> {
589 let module_info = ModuleInfo::new("./data", module_name, false);
590 if module_info.is_err() {
591 error!(
592 "fail open info of [{}], err={:?}",
593 module_name,
594 module_info.err()
595 );
596 return None;
597 }
598
599 let mut info = module_info.unwrap();
600 info.read_info()
601}