v_common_module/
module.rs

1use crate::common::sys_sig_listener;
2use crate::info::ModuleInfo;
3use crate::ticket::Ticket;
4use crate::veda_backend::Backend;
5use chrono::{Local, NaiveDateTime, Utc};
6use crossbeam_channel::{select, tick, Receiver};
7use env_logger::Builder;
8use ini::Ini;
9use nng::options::protocol::pubsub::Subscribe;
10use nng::options::Options;
11use nng::options::RecvTimeout;
12use nng::{Protocol, Socket};
13use std::io::Write;
14use std::time::Duration;
15use std::time::Instant;
16use std::{env, thread, time};
17use uuid::Uuid;
18use v_api::app::ResultCode;
19use v_api::*;
20use v_onto::datatype::Lang;
21use v_onto::individual::*;
22use v_onto::individual2msgpack::to_msgpack;
23use v_onto::parser::*;
24use v_queue::{consumer::*, record::*};
25use v_storage::storage::*;
26
27#[derive(Debug)]
28#[repr(u8)]
29pub enum PrepareError {
30    Fatal = 101,
31    Recoverable = 102,
32}
33
34const TICKS_TO_UNIX_EPOCH: i64 = 62_135_596_800_000;
35
36pub struct Module {
37    pub(crate) queue_prepared_count: i64,
38    notify_channel_url: String,
39    pub(crate) is_ready_notify_channel: bool,
40    notify_channel_read_timeout: Option<u64>,
41    pub(crate) max_timeout_between_batches: Option<u64>,
42    pub(crate) min_batch_size_to_cancel_timeout: Option<u32>,
43    pub max_batch_size: Option<u32>,
44    pub(crate) subsystem_id: Option<i64>,
45    pub(crate) syssig_ch: Option<Receiver<i32>>,
46    pub(crate) name: String,
47    onto_types: Vec<String>,
48}
49
50impl Default for Module {
51    fn default() -> Self {
52        Module::create(None, "")
53    }
54}
55
56impl Module {
57    pub fn new_with_name(name: &str) -> Self {
58        Module::create(None, name)
59    }
60
61    pub fn create(module_id: Option<i64>, module_name: &str) -> Self {
62        let args: Vec<String> = env::args().collect();
63
64        let conf = Ini::load_from_file("veda.properties").expect("fail load veda.properties file");
65        let section = conf.section(None::<String>).expect("fail parse veda.properties");
66
67        let mut notify_channel_url = String::default();
68        let mut max_timeout_between_batches = None;
69        let mut min_batch_size_to_cancel_timeout = None;
70        let mut max_batch_size = None;
71        let mut notify_channel_read_timeout = None;
72
73        for el in args.iter() {
74            if el.starts_with("--max_timeout_between_batches") {
75                let p: Vec<&str> = el.split('=').collect();
76                if let Ok(v) = p[1].parse::<u64>() {
77                    max_timeout_between_batches = Some(v);
78                    info!("use {} = {} ms", p[0], v);
79                }
80            } else if el.starts_with("--min_batch_size_to_cancel_timeout") {
81                let p: Vec<&str> = el.split('=').collect();
82                if let Ok(v) = p[1].parse::<u32>() {
83                    min_batch_size_to_cancel_timeout = Some(v);
84                    info!("use {} = {}", p[0], v);
85                }
86            } else if el.starts_with("--max_batch_size") {
87                let p: Vec<&str> = el.split('=').collect();
88                if let Ok(v) = p[1].parse::<u32>() {
89                    max_batch_size = Some(v);
90                    info!("use {} = {}", p[0], v);
91                }
92            } else if el.starts_with("--notify_channel_read_timeout") {
93                let p: Vec<&str> = el.split('=').collect();
94                if let Ok(v) = p[1].parse::<u64>() {
95                    notify_channel_read_timeout = Some(v);
96                    info!("use {} = {} ms", p[0], v);
97                }
98            } else if el.starts_with("--notify_channel_url") {
99                let p: Vec<&str> = el.split('=').collect();
100                notify_channel_url = p[1].to_owned();
101            }
102        }
103
104        if notify_channel_url.is_empty() {
105            if let Some(s) = section.get("notify_channel_url") {
106                notify_channel_url = s.to_owned()
107            }
108        }
109
110        let onto_types = vec![
111            "rdfs:Class",
112            "owl:Class",
113            "rdfs:Datatype",
114            "owl:Ontology",
115            "rdf:Property",
116            "owl:DatatypeProperty",
117            "owl:ObjectProperty",
118            "owl:OntologyProperty",
119            "owl:AnnotationProperty",
120            "v-ui:PropertySpecification",
121            "v-ui:DatatypePropertySpecification",
122            "v-ui:ObjectPropertySpecification",
123            "v-ui:TemplateSpecification",
124            "v-ui:ClassModel",
125        ];
126
127        Module {
128            queue_prepared_count: 0,
129            notify_channel_url,
130            is_ready_notify_channel: false,
131            max_timeout_between_batches,
132            min_batch_size_to_cancel_timeout,
133            max_batch_size,
134            subsystem_id: module_id,
135            notify_channel_read_timeout,
136            syssig_ch: None,
137            name: module_name.to_owned(),
138            onto_types: onto_types.iter().map(|x| x.to_string()).collect(),
139        }
140    }
141
142    pub fn new() -> Self {
143        Module::create(None, "")
144    }
145
146    pub fn get_property(param: &str) -> Option<String> {
147        let args: Vec<String> = env::args().collect();
148        for el in args.iter() {
149            if el.starts_with(&format!("--{}", param)) {
150                let p: Vec<&str> = el.split('=').collect();
151                if p.len() == 2 {
152                    let v = p[1].trim();
153                    info!("use arg --{}={}", param, v);
154                    return Some(p[1].to_owned());
155                }
156            }
157        }
158
159        let conf = Ini::load_from_file("veda.properties").expect("fail load veda.properties file");
160
161        let section = conf.section(None::<String>).expect("fail parse veda.properties");
162        if let Some(v) = section.get(param) {
163            let v = v.trim();
164            info!("use param {}={}", param, v);
165            return Some(v.to_string());
166        }
167
168        error!("param [{}] not found", param);
169        None
170    }
171
172    pub fn is_content_onto(&self, cmd: IndvOp, new_state: &mut Individual, prev_state: &mut Individual) -> bool {
173        if cmd != IndvOp::Remove {
174            if new_state.any_exists_v("rdf:type", &self.onto_types) {
175                return true;
176            }
177        } else if prev_state.any_exists_v("rdf:type", &self.onto_types) {
178            return true;
179        }
180        false
181    }
182
183    pub fn get_sys_ticket_id_from_db(storage: &mut VStorage) -> Result<String, i32> {
184        let mut indv = Individual::default();
185        if storage.get_individual_from_db(StorageId::Tickets, "systicket", &mut indv) {
186            if let Some(c) = indv.get_first_literal("v-s:resource") {
187                return Ok(c);
188            }
189        }
190        Err(-1)
191    }
192
193    pub(crate) fn connect_to_notify_channel(&mut self) -> Option<Socket> {
194        if !self.is_ready_notify_channel && !self.notify_channel_url.is_empty() {
195            let soc = Socket::new(Protocol::Sub0).unwrap();
196
197            let timeout = if let Some(t) = self.notify_channel_read_timeout {
198                t
199            } else {
200                1000
201            };
202
203            if let Err(e) = soc.set_opt::<RecvTimeout>(Some(Duration::from_millis(timeout))) {
204                error!("fail set timeout, {} err={}", self.notify_channel_url, e);
205                return None;
206            }
207
208            if let Err(e) = soc.dial(&self.notify_channel_url) {
209                error!("fail connect to, {} err={}", self.notify_channel_url, e);
210                return None;
211            } else {
212                let all_topics = vec![];
213                if let Err(e) = soc.set_opt::<Subscribe>(all_topics) {
214                    error!("fail subscribe, {} err={}", self.notify_channel_url, e);
215                    soc.close();
216                    self.is_ready_notify_channel = false;
217                    return None;
218                } else {
219                    info!("success subscribe on queue changes: {}", self.notify_channel_url);
220                    self.is_ready_notify_channel = true;
221                    return Some(soc);
222                }
223            }
224        }
225        None
226    }
227
228    pub fn listen_queue_raw<T>(
229        &mut self,
230        queue_consumer: &mut Consumer,
231        module_context: &mut T,
232        before_batch: &mut fn(&mut Backend, &mut T, batch_size: u32) -> Option<u32>,
233        prepare: &mut fn(&mut Backend, &mut T, &RawObj, &Consumer) -> Result<bool, PrepareError>,
234        after_batch: &mut fn(&mut Backend, &mut T, prepared_batch_size: u32) -> Result<bool, PrepareError>,
235        heartbeat: &mut fn(&mut Backend, &mut T) -> Result<(), PrepareError>,
236        backend: &mut Backend,
237    ) {
238        self.listen_queue_comb(queue_consumer, module_context, before_batch, Some(prepare), None, after_batch, heartbeat, backend)
239    }
240
241    pub fn listen_queue<T>(
242        &mut self,
243        queue_consumer: &mut Consumer,
244        module_context: &mut T,
245        before_batch: &mut fn(&mut Backend, &mut T, batch_size: u32) -> Option<u32>,
246        prepare: &mut fn(&mut Backend, &mut T, &mut Individual, &Consumer) -> Result<bool, PrepareError>,
247        after_batch: &mut fn(&mut Backend, &mut T, prepared_batch_size: u32) -> Result<bool, PrepareError>,
248        heartbeat: &mut fn(&mut Backend, &mut T) -> Result<(), PrepareError>,
249        backend: &mut Backend,
250    ) {
251        self.listen_queue_comb(queue_consumer, module_context, before_batch, None, Some(prepare), after_batch, heartbeat, backend)
252    }
253
254    fn listen_queue_comb<T>(
255        &mut self,
256        queue_consumer: &mut Consumer,
257        module_context: &mut T,
258        before_batch: &mut fn(&mut Backend, &mut T, batch_size: u32) -> Option<u32>,
259        prepare_raw: Option<&mut fn(&mut Backend, &mut T, &RawObj, &Consumer) -> Result<bool, PrepareError>>,
260        prepare_indv: Option<&mut fn(&mut Backend, &mut T, &mut Individual, &Consumer) -> Result<bool, PrepareError>>,
261        after_batch: &mut fn(&mut Backend, &mut T, prepared_batch_size: u32) -> Result<bool, PrepareError>,
262        heartbeat: &mut fn(&mut Backend, &mut T) -> Result<(), PrepareError>,
263        backend: &mut Backend,
264    ) {
265        if let Ok(ch) = sys_sig_listener() {
266            self.syssig_ch = Some(ch);
267        }
268
269        let mut soc = Socket::new(Protocol::Sub0).unwrap();
270        let mut count_timeout_error = 0;
271
272        let mut prev_batch_time = Instant::now();
273        let update = tick(Duration::from_millis(1));
274        loop {
275            if let Some(qq) = &self.syssig_ch {
276                select! {
277                    recv(update) -> _ => {
278                    }
279                    recv(qq) -> _ => {
280                        info!("Exit");
281                        std::process::exit (exitcode::OK);
282                        //break;
283                    }
284                }
285            }
286
287            match heartbeat(backend, module_context) {
288                Err(e) => {
289                    if let PrepareError::Fatal = e {
290                        error!("heartbeat: found fatal error, stop listen queue");
291                        break;
292                    }
293                }
294                _ => {}
295            }
296
297            if let Some(s) = self.connect_to_notify_channel() {
298                soc = s;
299            }
300
301            // read queue current part info
302            if let Err(e) = queue_consumer.queue.get_info_of_part(queue_consumer.id, true) {
303                error!("{} get_info_of_part {}: {}", self.queue_prepared_count, queue_consumer.id, e.as_str());
304                continue;
305            }
306
307            let size_batch = queue_consumer.get_batch_size();
308
309            let mut max_size_batch = size_batch;
310            if let Some(m) = self.max_batch_size {
311                max_size_batch = m;
312            }
313
314            if size_batch > 0 {
315                debug!("queue: batch size={}", size_batch);
316                if let Some(new_size) = before_batch(backend, module_context, size_batch) {
317                    max_size_batch = new_size;
318                }
319            }
320
321            let mut prepared_batch_size = 0;
322            for _it in 0..max_size_batch {
323                // пробуем взять из очереди заголовок сообщения
324                if !queue_consumer.pop_header() {
325                    break;
326                }
327
328                let mut raw = RawObj::new(vec![0; (queue_consumer.header.msg_length) as usize]);
329
330                // заголовок взят успешно, занесем содержимое сообщения в структуру Individual
331                if let Err(e) = queue_consumer.pop_body(&mut raw.data) {
332                    match e {
333                        ErrorQueue::FailReadTailMessage => {
334                            break;
335                        }
336                        ErrorQueue::InvalidChecksum => {
337                            error!("[module] consumer:pop_body: invalid CRC, attempt seek next record");
338                            queue_consumer.seek_next_pos();
339                            break;
340                        }
341                        _ => {
342                            error!("{} get msg from queue: {}", self.queue_prepared_count, e.as_str());
343                            break;
344                        }
345                    }
346                }
347
348                let mut need_commit = true;
349
350                if let Some(&mut f) = prepare_raw {
351                    match f(backend, module_context, &raw, queue_consumer) {
352                        Err(e) => {
353                            if let PrepareError::Fatal = e {
354                                warn!("prepare: found fatal error, stop listen queue");
355                                return;
356                            }
357                        }
358                        Ok(b) => {
359                            need_commit = b;
360                        }
361                    }
362                }
363
364                if let Some(&mut f) = prepare_indv {
365                    let mut queue_element = Individual::new_raw(raw);
366                    if parse_raw(&mut queue_element).is_ok() {
367                        let mut is_processed = true;
368                        if let Some(assigned_subsystems) = queue_element.get_first_integer("assigned_subsystems") {
369                            if assigned_subsystems > 0 {
370                                if let Some(my_subsystem_id) = self.subsystem_id {
371                                    if assigned_subsystems & my_subsystem_id == 0 {
372                                        is_processed = false;
373                                    }
374                                } else {
375                                    is_processed = false;
376                                }
377                            }
378                        }
379
380                        if is_processed {
381                            match f(backend, module_context, &mut queue_element, queue_consumer) {
382                                Err(e) => {
383                                    if let PrepareError::Fatal = e {
384                                        warn!("prepare: found fatal error, stop listen queue");
385                                        return;
386                                    }
387                                }
388                                Ok(b) => {
389                                    need_commit = b;
390                                }
391                            }
392                        }
393                    }
394                }
395
396                queue_consumer.next(need_commit);
397
398                self.queue_prepared_count += 1;
399
400                if self.queue_prepared_count % 1000 == 0 {
401                    info!("get from queue, count: {}", self.queue_prepared_count);
402                }
403                prepared_batch_size += 1;
404            }
405
406            if size_batch > 0 {
407                match after_batch(backend, module_context, prepared_batch_size) {
408                    Ok(b) => {
409                        if b {
410                            queue_consumer.commit();
411                        }
412                    }
413                    Err(e) => {
414                        if let PrepareError::Fatal = e {
415                            warn!("after_batch: found fatal error, stop listen queue");
416                            return;
417                        }
418                    }
419                }
420            }
421
422            if prepared_batch_size == size_batch {
423                let wmsg = soc.recv();
424                if let Err(e) = wmsg {
425                    debug!("fail recv from queue notify channel, err={:?}", e);
426
427                    if count_timeout_error > 0 && size_batch > 0 {
428                        warn!("queue changed but we not received notify message, need reconnect...");
429                        self.is_ready_notify_channel = false;
430                        count_timeout_error += 1;
431                    }
432                } else {
433                    count_timeout_error = 0;
434                }
435            }
436
437            if let Some(t) = self.max_timeout_between_batches {
438                let delta = prev_batch_time.elapsed().as_millis() as u64;
439                if let Some(c) = self.min_batch_size_to_cancel_timeout {
440                    if prepared_batch_size < c && delta < t {
441                        thread::sleep(time::Duration::from_millis(t - delta));
442                        info!("sleep {} ms", t - delta);
443                    }
444                } else if delta < t {
445                    thread::sleep(time::Duration::from_millis(t - delta));
446                    info!("sleep {} ms", t - delta);
447                }
448            }
449
450            prev_batch_time = Instant::now();
451        }
452    }
453}
454
455pub fn get_inner_binobj_as_individual<'a>(queue_element: &'a mut Individual, field_name: &str, new_indv: &'a mut Individual) -> bool {
456    let binobj = queue_element.get_first_binobj(field_name);
457    if binobj.is_some() {
458        new_indv.set_raw(&binobj.unwrap_or_default());
459        if parse_raw(new_indv).is_ok() {
460            return true;
461        }
462    }
463    false
464}
465
466pub fn get_cmd(queue_element: &mut Individual) -> Option<IndvOp> {
467    let wcmd = queue_element.get_first_integer("cmd");
468    wcmd?;
469
470    Some(IndvOp::from_i64(wcmd.unwrap_or_default()))
471}
472
473pub fn init_log(module_name: &str) {
474    init_log_with_filter(module_name, None)
475}
476
477pub fn init_log_with_filter(module_name: &str, filter: Option<&str>) {
478    let var_log_name = module_name.to_owned() + "_LOG";
479    match std::env::var_os(var_log_name.to_owned()) {
480        Some(val) => println!("use env var: {}: {:?}", var_log_name, val.to_str()),
481        None => std::env::set_var(var_log_name.to_owned(), "info"),
482    }
483
484    let filters_str = if let Some(f) = filter {
485        f.to_owned()
486    } else {
487        env::var(var_log_name).unwrap_or_default()
488    };
489
490    Builder::new()
491        .format(|buf, record| writeln!(buf, "{} [{}] - {}", Local::now().format("%Y-%m-%dT%H:%M:%S%.3f"), record.level(), record.args()))
492        .parse_filters(&filters_str)
493        .init()
494}
495
496pub fn create_new_ticket(login: &str, user_id: &str, duration: i64, ticket: &mut Ticket, storage: &mut VStorage) {
497    let mut ticket_indv = Individual::default();
498
499    ticket.result = ResultCode::FailStore;
500    ticket_indv.add_string("rdf:type", "ticket:ticket", Lang::NONE);
501
502    if !ticket.id.is_empty() && !ticket.id.is_empty() {
503        ticket_indv.set_id(&ticket.id);
504    } else {
505        ticket_indv.set_id(&Uuid::new_v4().to_hyphenated().to_string());
506    }
507
508    ticket_indv.add_string("ticket:login", login, Lang::NONE);
509    ticket_indv.add_string("ticket:accessor", user_id, Lang::NONE);
510
511    let now = Utc::now();
512    let start_time_str = format!("{:?}", now.naive_utc());
513
514    if start_time_str.len() > 28 {
515        ticket_indv.add_string("ticket:when", &start_time_str[0..28], Lang::NONE);
516    } else {
517        ticket_indv.add_string("ticket:when", &start_time_str, Lang::NONE);
518    }
519
520    ticket_indv.add_string("ticket:duration", &duration.to_string(), Lang::NONE);
521
522    let mut raw1: Vec<u8> = Vec::new();
523    if to_msgpack(&ticket_indv, &mut raw1).is_ok() && storage.put_kv_raw(StorageId::Tickets, ticket_indv.get_id(), raw1) {
524        ticket.update_from_individual(&mut ticket_indv);
525        ticket.result = ResultCode::Ok;
526        ticket.start_time = (TICKS_TO_UNIX_EPOCH + now.timestamp_millis()) * 10_000;
527        ticket.end_time = ticket.start_time + duration as i64 * 10_000_000;
528
529        let end_time_str = format!("{:?}", NaiveDateTime::from_timestamp((ticket.end_time / 10_000 - TICKS_TO_UNIX_EPOCH) / 1_000, 0));
530        info!("create new ticket {}, login={}, user={}, start={}, end={}", ticket.id, ticket.user_login, ticket.user_uri, start_time_str, end_time_str);
531    } else {
532        error!("fail store ticket {:?}", ticket)
533    }
534}
535
536pub fn create_sys_ticket(storage: &mut VStorage) -> Ticket {
537    let mut ticket = Ticket::default();
538    create_new_ticket("veda", "cfg:VedaSystem", 90_000_000, &mut ticket, storage);
539
540    if ticket.result == ResultCode::Ok {
541        let mut sys_ticket_link = Individual::default();
542        sys_ticket_link.set_id("systicket");
543        sys_ticket_link.add_uri("rdf:type", "rdfs:Resource");
544        sys_ticket_link.add_uri("v-s:resource", &ticket.id);
545        let mut raw1: Vec<u8> = Vec::new();
546        if to_msgpack(&sys_ticket_link, &mut raw1).is_ok() && storage.put_kv_raw(StorageId::Tickets, sys_ticket_link.get_id(), raw1) {
547            return ticket;
548        } else {
549            error!("fail store system ticket link")
550        }
551    } else {
552        error!("fail create sys ticket")
553    }
554
555    ticket
556}
557
558pub fn get_info_of_module(module_name: &str) -> Option<(i64, i64)> {
559    let module_info = ModuleInfo::new("./data", module_name, false);
560    if module_info.is_err() {
561        error!("fail open info of [{}], err={:?}", module_name, module_info.err());
562        return None;
563    }
564
565    let mut info = module_info.unwrap();
566    info.read_info()
567}
568
569pub fn wait_load_ontology() -> i64 {
570    wait_module("input-onto", 1)
571}
572
573pub fn wait_module(module_name: &str, wait_op_id: i64) -> i64 {
574    if wait_op_id < 0 {
575        error!("wait module [{}] to complete op_id={}", module_name, wait_op_id);
576        return -1;
577    }
578
579    info!("wait module [{}] to complete op_id={}", module_name, wait_op_id);
580    loop {
581        let module_info = ModuleInfo::new("./data", module_name, false);
582        if module_info.is_err() {
583            error!("fail open info of [{}], err={:?}", module_name, module_info.err());
584            thread::sleep(time::Duration::from_millis(300));
585            continue;
586        }
587
588        let mut info = module_info.unwrap();
589        loop {
590            if let Some((_, committed)) = info.read_info() {
591                if committed >= wait_op_id {
592                    info!("wait module [{}] to complete op_id={}, found commited_op_id={}", module_name, wait_op_id, committed);
593                    return committed;
594                }
595            } else {
596                error!("fail read info for module [{}]", module_name);
597                //break;
598            }
599            thread::sleep(time::Duration::from_millis(300));
600        }
601
602        //break;
603    }
604
605    //-1
606}