Skip to main content

v_common/module/
module_impl.rs

1use crate::module::common::sys_sig_listener;
2use crate::module::info::ModuleInfo;
3use crate::module::veda_backend::Backend;
4use v_individual_model::onto::individual::{Individual, RawObj};
5use v_individual_model::onto::parser::parse_raw;
6use v_storage::{StorageId, VStorage};
7use crate::v_api::api_client::IndvOp;
8use chrono::Local;
9use crossbeam_channel::{select, tick, Receiver};
10use env_logger::Builder;
11use ini::Ini;
12use nng::options::protocol::pubsub::Subscribe;
13use nng::options::Options;
14use nng::options::RecvTimeout;
15use nng::{Protocol, Socket};
16use std::io::Write;
17use std::str::FromStr;
18use std::time::Duration;
19use std::time::Instant;
20use std::{env, thread, time};
21use v_queue::{consumer::*, record::*};
22
23#[derive(Debug)]
24#[repr(u8)]
25pub enum PrepareError {
26    Fatal = 101,
27    Recoverable = 102,
28}
29
30const NOTIFY_CHANNEL_RECONNECT_TIMEOUT: u64 = 300;
31
32pub struct Module {
33    pub(crate) queue_prepared_count: i64,
34    notify_channel_url: String,
35    pub(crate) is_ready_notify_channel: bool,
36    notify_channel_read_timeout: Option<u64>,
37    pub(crate) max_timeout_between_batches: Option<u64>,
38    pub(crate) min_batch_size_to_cancel_timeout: Option<u32>,
39    pub max_batch_size: Option<u32>,
40    pub(crate) subsystem_id: Option<i64>,
41    pub(crate) syssig_ch: Option<Receiver<i32>>,
42    pub(crate) name: String,
43    onto_types: Vec<String>,
44}
45
46impl Default for Module {
47    fn default() -> Self {
48        Module::create(None, "")
49    }
50}
51
52impl Module {
53    pub fn new_with_name(name: &str) -> Self {
54        Module::create(None, name)
55    }
56
57    pub fn create(module_id: Option<i64>, module_name: &str) -> Self {
58        let args: Vec<String> = env::args().collect();
59
60        let mut notify_channel_url = String::default();
61        let mut max_timeout_between_batches = None;
62        let mut min_batch_size_to_cancel_timeout = None;
63        let mut max_batch_size = None;
64        let mut notify_channel_read_timeout = None;
65
66        for el in args.iter() {
67            if el.starts_with("--max_timeout_between_batches") {
68                let p: Vec<&str> = el.split('=').collect();
69                if let Ok(v) = p[1].parse::<u64>() {
70                    max_timeout_between_batches = Some(v);
71                    info!("use {} = {} ms", p[0], v);
72                }
73            } else if el.starts_with("--min_batch_size_to_cancel_timeout") {
74                let p: Vec<&str> = el.split('=').collect();
75                if let Ok(v) = p[1].parse::<u32>() {
76                    min_batch_size_to_cancel_timeout = Some(v);
77                    info!("use {} = {}", p[0], v);
78                }
79            } else if el.starts_with("--max_batch_size") {
80                let p: Vec<&str> = el.split('=').collect();
81                if let Ok(v) = p[1].parse::<u32>() {
82                    max_batch_size = Some(v);
83                    println!("use {} = {}", p[0], v);
84                }
85            } else if el.starts_with("--notify_channel_read_timeout") {
86                let p: Vec<&str> = el.split('=').collect();
87                if let Ok(v) = p[1].parse::<u64>() {
88                    notify_channel_read_timeout = Some(v);
89                    info!("use {} = {} ms", p[0], v);
90                }
91            } else if el.starts_with("--notify_channel_url") {
92                let p: Vec<&str> = el.split('=').collect();
93                notify_channel_url = p[1].to_owned();
94            }
95        }
96
97        if notify_channel_url.is_empty() {
98            if let Some(s) = Module::get_property("notify_channel_url") {
99                notify_channel_url = s
100            }
101        }
102
103        let onto_types = vec![
104            "rdfs:Class",
105            "owl:Class",
106            "rdfs:Datatype",
107            "owl:Ontology",
108            "rdf:Property",
109            "owl:DatatypeProperty",
110            "owl:ObjectProperty",
111            "owl:OntologyProperty",
112            "owl:AnnotationProperty",
113            "v-ui:PropertySpecification",
114            "v-ui:DatatypePropertySpecification",
115            "v-ui:ObjectPropertySpecification",
116            "v-ui:TemplateSpecification",
117            "v-ui:ClassModel",
118        ];
119
120        Module {
121            queue_prepared_count: 0,
122            notify_channel_url,
123            is_ready_notify_channel: false,
124            max_timeout_between_batches,
125            min_batch_size_to_cancel_timeout,
126            max_batch_size,
127            subsystem_id: module_id,
128            notify_channel_read_timeout,
129            syssig_ch: None,
130            name: module_name.to_owned(),
131            onto_types: onto_types.iter().map(|x| x.to_string()).collect(),
132        }
133    }
134
135    pub fn new() -> Self {
136        Module::create(None, "")
137    }
138
139    // A function that retrieves a property value from a configuration file
140    // The function takes an input parameter as an argument and returns an Option<String>
141    pub fn get_property<T: FromStr>(in_param: &str) -> Option<T> {
142        // Load the configuration file "veda.properties" using the Ini library and panic if it fails
143        let conf = Ini::load_from_file("veda.properties").expect("fail load veda.properties file");
144
145        // Extract the [alias] section from the configuration file and panic if it fails
146        let aliases = conf.section(Some("alias")).expect("fail parse veda.properties, section [alias]");
147
148        // Collect command line arguments into a vector of strings
149        let args: Vec<String> = env::args().collect();
150
151        let params = [in_param.replace('_', "-"), in_param.replace('-', "_")];
152
153        // Loop through the command line arguments and check if any of them match the possible parameter names
154        for el in args.iter() {
155            for param in &params {
156                if el.starts_with(&format!("--{}", param)) {
157                    // Split the argument into a key and a value
158                    let p: Vec<&str> = el.split('=').collect();
159
160                    // If the argument has a key and a value, retrieve the value and check for aliases
161                    if p.len() == 2 {
162                        let v = p[1].trim();
163                        let val = if let Some(a) = aliases.get(v) {
164                            info!("use arg --{}={}, alias={}", param, a, v);
165                            a
166                        } else {
167                            info!("use arg --{}={}", param, v);
168                            v
169                        };
170
171                        return val.parse().ok();
172                    }
173                }
174            }
175        }
176
177        // If the parameter was not found in the command line arguments, try to retrieve it from the configuration file
178        let section = conf.section(None::<String>).expect("fail parse veda.properties");
179
180        if let Some(v) = section.get(in_param) {
181            // If the parameter is found, retrieve its value and check for aliases
182            let mut val = v.trim().to_owned();
183
184            // If the value starts with a dollar sign ($), it is interpreted as an environment variable and the value of the variable is retrieved
185            if val.starts_with('$') {
186                if let Ok(val4var) = env::var(val.strip_prefix('$').unwrap_or_default()) {
187                    info!("get env variable [{}]", val);
188                    val = val4var;
189                } else {
190                    info!("not found env variable {}", val);
191                    return None;
192                }
193            }
194
195            // Check for aliases and log the parameter and its value
196            let res = if let Some(a) = aliases.get(&val) {
197                info!("use param [{}]={}, alias={}", in_param, a, val);
198                a
199            } else {
200                info!("use param [{}]={}", in_param, val);
201                &val
202            };
203
204            // Parse the value into the desired type and return it as an Option<T>
205            return res.parse().ok();
206        }
207
208        // If the parameter was not found in the configuration file, log an error and return None
209        error!("param [{}] not found", in_param);
210        None
211    }
212
213    pub fn is_content_onto(&self, cmd: IndvOp, new_state: &mut Individual, prev_state: &mut Individual) -> bool {
214        if cmd != IndvOp::Remove {
215            if new_state.any_exists_v("rdf:type", &self.onto_types) {
216                return true;
217            }
218        } else if prev_state.any_exists_v("rdf:type", &self.onto_types) {
219            return true;
220        }
221        false
222    }
223
224    pub fn get_sys_ticket_id_from_db(storage: &mut VStorage) -> Result<String, i32> {
225        let mut indv = Individual::default();
226        if storage.get_individual_from_storage(StorageId::Tickets, "systicket", &mut indv).is_ok() {
227            if let Some(c) = indv.get_first_literal("v-s:resource") {
228                return Ok(c);
229            }
230        }
231        Err(-1)
232    }
233
234    pub(crate) fn connect_to_notify_channel(&mut self) -> Option<Socket> {
235        if !self.is_ready_notify_channel && !self.notify_channel_url.is_empty() {
236            let soc = Socket::new(Protocol::Sub0).unwrap();
237
238            let timeout = if let Some(t) = self.notify_channel_read_timeout {
239                t
240            } else {
241                1000
242            };
243
244            if let Err(e) = soc.set_opt::<RecvTimeout>(Some(Duration::from_millis(timeout))) {
245                error!("fail set timeout, {} err={}", self.notify_channel_url, e);
246                return None;
247            }
248
249            if let Err(e) = soc.dial(&self.notify_channel_url) {
250                error!("fail connect to, {} err={}", self.notify_channel_url, e);
251                return None;
252            } else {
253                let all_topics = vec![];
254                if let Err(e) = soc.set_opt::<Subscribe>(all_topics) {
255                    error!("fail subscribe, {} err={}", self.notify_channel_url, e);
256                    soc.close();
257                    self.is_ready_notify_channel = false;
258                    return None;
259                } else {
260                    info!("success subscribe on queue changes: {}", self.notify_channel_url);
261                    self.is_ready_notify_channel = true;
262                    return Some(soc);
263                }
264            }
265        }
266        None
267    }
268
269    pub fn listen_queue_raw<T>(
270        &mut self,
271        queue_consumer: &mut Consumer,
272        module_context: &mut T,
273        before_batch: &mut fn(&mut Backend, &mut T, batch_size: u32) -> Option<u32>,
274        prepare: &mut fn(&mut Backend, &mut T, &RawObj, &Consumer) -> Result<bool, PrepareError>,
275        after_batch: &mut fn(&mut Backend, &mut T, prepared_batch_size: u32) -> Result<bool, PrepareError>,
276        heartbeat: &mut fn(&mut Backend, &mut T) -> Result<(), PrepareError>,
277        backend: &mut Backend,
278    ) {
279        self.listen_queue_comb(queue_consumer, module_context, before_batch, Some(prepare), None, after_batch, heartbeat, backend)
280    }
281
282    pub fn listen_queue<T>(
283        &mut self,
284        queue_consumer: &mut Consumer,
285        module_context: &mut T,
286        before_batch: &mut fn(&mut Backend, &mut T, batch_size: u32) -> Option<u32>,
287        prepare: &mut fn(&mut Backend, &mut T, &mut Individual, &Consumer) -> Result<bool, PrepareError>,
288        after_batch: &mut fn(&mut Backend, &mut T, prepared_batch_size: u32) -> Result<bool, PrepareError>,
289        heartbeat: &mut fn(&mut Backend, &mut T) -> Result<(), PrepareError>,
290        backend: &mut Backend,
291    ) {
292        self.listen_queue_comb(queue_consumer, module_context, before_batch, None, Some(prepare), after_batch, heartbeat, backend)
293    }
294
295    fn listen_queue_comb<T>(
296        &mut self,
297        queue_consumer: &mut Consumer,
298        module_context: &mut T,
299        before_batch: &mut fn(&mut Backend, &mut T, batch_size: u32) -> Option<u32>,
300        prepare_raw: Option<&mut fn(&mut Backend, &mut T, &RawObj, &Consumer) -> Result<bool, PrepareError>>,
301        prepare_indv: Option<&mut fn(&mut Backend, &mut T, &mut Individual, &Consumer) -> Result<bool, PrepareError>>,
302        after_batch: &mut fn(&mut Backend, &mut T, prepared_batch_size: u32) -> Result<bool, PrepareError>,
303        heartbeat: &mut fn(&mut Backend, &mut T) -> Result<(), PrepareError>,
304        backend: &mut Backend,
305    ) {
306        if let Ok(ch) = sys_sig_listener() {
307            self.syssig_ch = Some(ch);
308        }
309
310        let mut soc = None;
311        let mut count_timeout_error = 0;
312
313        let mut prev_batch_time = Instant::now();
314        let update = tick(Duration::from_millis(1));
315        loop {
316            if let Some(qq) = &self.syssig_ch {
317                select! {
318                    recv(update) -> _ => {
319                    }
320                    recv(qq) -> _ => {
321                        info!("queue {}/{}, part:{}, pos:{}", queue_consumer.queue.base_path, queue_consumer.name, queue_consumer.id, queue_consumer.count_popped);
322                        info!("Exit");
323                        std::process::exit (exitcode::OK);
324                        //break;
325                    }
326                }
327            }
328
329            if let Err(PrepareError::Fatal) = heartbeat(backend, module_context) {
330                error!("heartbeat: found fatal error, stop listen queue");
331                break;
332            }
333
334            if soc.is_none() {
335                soc = self.connect_to_notify_channel();
336                if soc.is_none() {
337                    thread::sleep(time::Duration::from_millis(NOTIFY_CHANNEL_RECONNECT_TIMEOUT));
338                    info!("sleep {} ms", NOTIFY_CHANNEL_RECONNECT_TIMEOUT);
339                }
340            }
341
342            // read queue current part info
343            if let Err(e) = queue_consumer.queue.get_info_of_part(queue_consumer.id, true) {
344                error!("{} get_info_of_part {}: {}", self.queue_prepared_count, queue_consumer.id, e.as_str());
345                continue;
346            }
347
348            let size_batch = queue_consumer.get_batch_size();
349
350            let mut max_size_batch = size_batch;
351            if let Some(m) = self.max_batch_size {
352                max_size_batch = m;
353            }
354
355            if size_batch > 0 {
356                debug!("queue: batch size={}", size_batch);
357                if let Some(new_size) = before_batch(backend, module_context, size_batch) {
358                    max_size_batch = new_size;
359                }
360            }
361
362            let mut prepared_batch_size = 0;
363            for _it in 0..max_size_batch {
364                // пробуем взять из очереди заголовок сообщения
365                if !queue_consumer.pop_header() {
366                    break;
367                }
368
369                let mut raw = RawObj::new(vec![0; (queue_consumer.header.msg_length) as usize]);
370
371                // заголовок взят успешно, занесем содержимое сообщения в структуру Individual
372                if let Err(e) = queue_consumer.pop_body(&mut raw.data) {
373                    match e {
374                        ErrorQueue::FailReadTailMessage => {
375                            break;
376                        },
377                        ErrorQueue::InvalidChecksum => {
378                            error!("[module] consumer:pop_body: invalid CRC, attempt seek next record");
379                            queue_consumer.seek_next_pos();
380                            break;
381                        },
382                        _ => {
383                            error!("{} get msg from queue: {}", self.queue_prepared_count, e.as_str());
384                            break;
385                        },
386                    }
387                }
388
389                let mut need_commit = true;
390
391                if let Some(&mut f) = prepare_raw {
392                    match f(backend, module_context, &raw, queue_consumer) {
393                        Err(e) => {
394                            if let PrepareError::Fatal = e {
395                                warn!("prepare: found fatal error, stop listen queue");
396                                return;
397                            }
398                        },
399                        Ok(b) => {
400                            need_commit = b;
401                        },
402                    }
403                }
404
405                if let Some(&mut f) = prepare_indv {
406                    let mut queue_element = Individual::new_raw(raw);
407                    if parse_raw(&mut queue_element).is_ok() {
408                        let mut is_processed = true;
409                        if let Some(assigned_subsystems) = queue_element.get_first_integer("assigned_subsystems") {
410                            if assigned_subsystems > 0 {
411                                if let Some(my_subsystem_id) = self.subsystem_id {
412                                    if assigned_subsystems & my_subsystem_id == 0 {
413                                        is_processed = false;
414                                    }
415                                } else {
416                                    is_processed = false;
417                                }
418                            }
419                        }
420
421                        if is_processed {
422                            match f(backend, module_context, &mut queue_element, queue_consumer) {
423                                Err(e) => {
424                                    if let PrepareError::Fatal = e {
425                                        warn!("prepare: found fatal error, stop listen queue");
426                                        return;
427                                    }
428                                },
429                                Ok(b) => {
430                                    need_commit = b;
431                                },
432                            }
433                        }
434                    }
435                }
436
437                if need_commit {
438                    queue_consumer.commit();
439                }
440
441                self.queue_prepared_count += 1;
442
443                if self.queue_prepared_count % 1000 == 0 {
444                    info!("get from queue, count: {}", self.queue_prepared_count);
445                }
446                prepared_batch_size += 1;
447            }
448
449            if size_batch > 0 {
450                match after_batch(backend, module_context, prepared_batch_size) {
451                    Ok(b) => {
452                        if b {
453                            queue_consumer.commit();
454                        }
455                    },
456                    Err(e) => {
457                        if let PrepareError::Fatal = e {
458                            warn!("after_batch: found fatal error, stop listen queue");
459                            return;
460                        }
461                    },
462                }
463            }
464
465            if prepared_batch_size == size_batch {
466                if let Some(s) = &soc {
467                    let wmsg = s.recv();
468                    if let Err(e) = wmsg {
469                        debug!("fail recv from queue notify channel, err={:?}", e);
470
471                        if count_timeout_error > 0 && size_batch > 0 {
472                            warn!("queue changed but we not received notify message, need reconnect...");
473                            self.is_ready_notify_channel = false;
474                            count_timeout_error += 1;
475                        }
476                    } else {
477                        count_timeout_error = 0;
478                    }
479                }
480            }
481
482            if let Some(t) = self.max_timeout_between_batches {
483                let delta = prev_batch_time.elapsed().as_millis() as u64;
484                if let Some(c) = self.min_batch_size_to_cancel_timeout {
485                    if prepared_batch_size < c && delta < t {
486                        thread::sleep(time::Duration::from_millis(t - delta));
487                        info!("sleep {} ms", t - delta);
488                    }
489                } else if delta < t {
490                    thread::sleep(time::Duration::from_millis(t - delta));
491                    info!("sleep {} ms", t - delta);
492                }
493            }
494
495            prev_batch_time = Instant::now();
496        }
497    }
498}
499
500pub fn get_inner_binobj_as_individual<'a>(queue_element: &'a mut Individual, field_name: &str, new_indv: &'a mut Individual) -> bool {
501    let binobj = queue_element.get_first_binobj(field_name);
502    if binobj.is_some() {
503        new_indv.set_raw(&binobj.unwrap_or_default());
504        if parse_raw(new_indv).is_ok() {
505            return true;
506        }
507    }
508    false
509}
510
511pub fn get_cmd(queue_element: &mut Individual) -> Option<IndvOp> {
512    let wcmd = queue_element.get_first_integer("cmd");
513    wcmd?;
514
515    Some(IndvOp::from_i64(wcmd.unwrap_or_default()))
516}
517
518pub fn init_log(module_name: &str) {
519    init_log_with_filter(module_name, None)
520}
521
522pub fn init_log_with_filter(module_name: &str, filter: Option<&str>) {
523    init_log_with_params(module_name, filter, false);
524}
525
526pub fn init_log_with_params(module_name: &str, filter: Option<&str>, with_thread_id: bool) {
527    let var_log_name = module_name.to_owned() + "_LOG";
528    match std::env::var_os(&var_log_name) {
529        Some(val) => println!("use env var: {}: {:?}", var_log_name, val.to_str()),
530        None => std::env::set_var(&var_log_name, "info"),
531    }
532
533    let filters_str = if let Some(f) = filter {
534        f.to_owned()
535    } else {
536        env::var(var_log_name).unwrap_or_default()
537    };
538
539    if with_thread_id {
540        Builder::new()
541            .format(|buf, record| {
542                writeln!(buf, "{} {} [{}] - {}", thread_id::get(), Local::now().format("%Y-%m-%dT%H:%M:%S%.3f"), record.level(), record.args())
543            })
544            .parse_filters(&filters_str)
545            .try_init()
546            .unwrap_or(())
547    } else {
548        Builder::new()
549            .format(|buf, record| writeln!(buf, "{} [{}] - {}", Local::now().format("%Y-%m-%dT%H:%M:%S%.3f"), record.level(), record.args()))
550            .parse_filters(&filters_str)
551            .try_init()
552            .unwrap_or(())
553    }
554}
555
556pub fn get_info_of_module(module_name: &str) -> Option<(i64, i64)> {
557    let module_info = ModuleInfo::new("./data", module_name, false);
558    if module_info.is_err() {
559        error!("fail open info of [{}], err={:?}", module_name, module_info.err());
560        return None;
561    }
562
563    let mut info = module_info.unwrap();
564    info.read_info()
565}
566
567pub fn wait_load_ontology() -> i64 {
568    wait_module("input-onto", 1)
569}
570
571pub fn wait_module(module_name: &str, wait_op_id: i64) -> i64 {
572    if wait_op_id < 0 {
573        error!("wait module [{}] to complete op_id={}", module_name, wait_op_id);
574        return -1;
575    }
576
577    info!("wait module [{}] to complete op_id={}", module_name, wait_op_id);
578    loop {
579        let module_info = ModuleInfo::new("./data", module_name, false);
580        if module_info.is_err() {
581            error!("fail open info of [{}], err={:?}", module_name, module_info.err());
582            thread::sleep(time::Duration::from_millis(300));
583            continue;
584        }
585
586        let mut info = module_info.unwrap();
587        loop {
588            if let Some((_, committed)) = info.read_info() {
589                if committed >= wait_op_id {
590                    info!("wait module [{}] to complete op_id={}, found commited_op_id={}", module_name, wait_op_id, committed);
591                    return committed;
592                }
593            } else {
594                error!("fail read info for module [{}]", module_name);
595                //break;
596            }
597            thread::sleep(time::Duration::from_millis(300));
598        }
599
600        //break;
601    }
602
603    //-1
604}