v_common_module/
common.rs

1use crossbeam_channel::{bounded, Receiver};
2use signal_hook::consts::signal::*;
3use signal_hook::consts::{SIGCONT, SIGHUP, SIGTSTP, SIGWINCH};
4use std::collections::HashSet;
5use std::io::Error;
6use std::os::raw::c_int;
7use std::thread;
8use v_api::v_onto::individual::*;
9use v_api::v_onto::onto::*;
10use v_api::v_onto::onto_index::OntoIndex;
11use v_storage::storage::VStorage;
12#[cfg(feature = "extended-siginfo")]
13type Signals = signal_hook::iterator::SignalsInfo<signal_hook::iterator::exfiltrator::origin::WithOrigin>;
14use chrono::Utc;
15#[cfg(not(feature = "extended-siginfo"))]
16use signal_hook::iterator::Signals;
17use signal_hook::low_level;
18use v_queue::consumer::Consumer;
19use v_queue::record::Mode;
20
21pub const DATA_BASE_PATH: &str = "./data";
22
23pub fn load_onto(storage: &mut VStorage, onto: &mut Onto) -> bool {
24    let onto_index = OntoIndex::load();
25
26    info!("load {} onto elements", onto_index.len());
27
28    for id in onto_index.data.keys() {
29        let mut indv: Individual = Individual::default();
30        if storage.get_individual(&id, &mut indv) {
31            onto.update(&mut indv);
32        }
33    }
34
35    info!("add to hierarchy {} elements", onto.relations.len());
36
37    let keys: Vec<String> = onto.relations.iter().map(|(key, _)| key.clone()).collect();
38
39    for el in keys.iter() {
40        let mut buf: HashSet<String> = HashSet::new();
41        onto.get_subs(el, &mut buf);
42        if !buf.is_empty() {
43            onto.update_subs(el, &mut buf);
44            //info!("{}, subs={:?}", el, buf);
45        }
46    }
47
48    info!("end update subs");
49
50    true
51}
52
53const SIGNALS: &[c_int] = &[SIGTERM, SIGQUIT, SIGINT, SIGTSTP, SIGWINCH, SIGHUP, SIGCHLD, SIGCONT];
54
55pub fn sys_sig_listener() -> Result<Receiver<i32>, Error> {
56    let (sender, receiver) = bounded(1);
57    thread::spawn(move || {
58        info!("Start system signal listener");
59        let mut sigs = Signals::new(SIGNALS).unwrap();
60        for signal in &mut sigs {
61            warn!("Received signal {:?}", signal);
62            #[cfg(feature = "extended-siginfo")]
63            let signal = signal.signal;
64
65            if signal != SIGTERM {
66                low_level::emulate_default_handler(signal).unwrap();
67            }
68
69            let _ = sender.send(signal);
70        }
71    });
72
73    Ok(receiver)
74}
75
76const MAIN_QUEUE_NAME: &str = "individuals-flow";
77
78pub fn get_queue_status(id: &str) -> Individual {
79    let mut out_indv = Individual::default();
80    if let Some(consumer_name) = id.strip_prefix("srv:queue-state-") {
81        let base_path: &str = &(DATA_BASE_PATH.to_owned() + "/queue");
82        if let Ok(mut c) = Consumer::new_with_mode(base_path, consumer_name, MAIN_QUEUE_NAME, Mode::Read) {
83            c.open(false);
84            c.get_info();
85            if c.queue.get_info_of_part(c.id, false).is_ok() {
86                out_indv.set_id(id);
87                out_indv.add_uri("rdf:type", "v-s:AppInfo");
88                out_indv.add_datetime("v-s:created", Utc::now().naive_utc().timestamp());
89                out_indv.add_uri("srv:queue", &("srv:".to_owned() + consumer_name));
90                out_indv.add_integer("srv:total_count", c.queue.count_pushed as i64);
91                out_indv.add_integer("srv:current_count", c.count_popped as i64);
92            }
93        }
94    }
95    out_indv
96}