Skip to main content

v_common/module/
common.rs

1use crossbeam_channel::{bounded, Receiver};
2use signal_hook::consts::signal::*;
3use signal_hook::consts::{SIGCONT, SIGTSTP};
4use std::collections::HashSet;
5use std::io::Error;
6use std::os::raw::c_int;
7use std::thread;
8#[cfg(feature = "extended-siginfo")]
9type Signals = signal_hook::iterator::SignalsInfo<signal_hook::iterator::exfiltrator::origin::WithOrigin>;
10use v_individual_model::onto::individual::Individual;
11use v_individual_model::onto::onto_impl::Onto;
12use v_individual_model::onto::onto_index::OntoIndex;
13use crate::storage::async_storage::{get_individual_from_db, AStorage};
14use v_storage::VStorage;
15use crate::v_api::common_type::ResultCode;
16use chrono::Utc;
17#[cfg(not(feature = "extended-siginfo"))]
18use signal_hook::iterator::Signals;
19use signal_hook::low_level;
20use v_queue::consumer::Consumer;
21use v_queue::record::Mode;
22
23pub const DATA_BASE_PATH: &str = "./data";
24
25pub async fn c_load_onto(storage: &AStorage, onto: &mut Onto) -> bool {
26    let onto_index = OntoIndex::load();
27
28    info!("load {} onto elements", onto_index.len());
29
30    for id in onto_index.data.keys() {
31        if let Ok((mut indv, res)) = get_individual_from_db(id, "", storage, None).await {
32            if res == ResultCode::Ok {
33                onto.update(&mut indv);
34            }
35        }
36    }
37
38    info!("add to hierarchy {} elements", onto.relations.len());
39
40    let keys: Vec<String> = onto.relations.keys().cloned().collect();
41
42    for el in keys.iter() {
43        let mut buf: HashSet<String> = HashSet::new();
44        onto.get_subs(el, &mut buf);
45        if !buf.is_empty() {
46            onto.update_subs(el, &mut buf);
47            //info!("{}, subs={:?}", el, buf);
48        }
49    }
50
51    info!("end update subs");
52
53    true
54}
55
56pub fn load_onto(storage: &mut VStorage, onto: &mut Onto) -> bool {
57    let onto_index = OntoIndex::load();
58
59    info!("load {} onto elements", onto_index.len());
60
61    for id in onto_index.data.keys() {
62        let mut indv: Individual = Individual::default();
63        if storage.get_individual(id, &mut indv).is_ok() {
64            onto.update(&mut indv);
65        }
66    }
67
68    info!("add to hierarchy {} elements", onto.relations.len());
69
70    let keys: Vec<String> = onto.relations.keys().cloned().collect();
71
72    for el in keys.iter() {
73        let mut buf: HashSet<String> = HashSet::new();
74        onto.get_subs(el, &mut buf);
75        if !buf.is_empty() {
76            onto.update_subs(el, &mut buf);
77            //info!("{}, subs={:?}", el, buf);
78        }
79    }
80
81    info!("end update subs");
82
83    true
84}
85
86const SIGNALS: &[c_int] = &[SIGTERM, SIGQUIT, SIGINT, SIGTSTP, SIGCONT];
87
88pub fn sys_sig_listener() -> Result<Receiver<i32>, Error> {
89    let (sender, receiver) = bounded(1);
90    thread::spawn(move || {
91        info!("Start system signal listener");
92        let mut sigs = Signals::new(SIGNALS).unwrap();
93        for signal in &mut sigs {
94            warn!("Received signal {:?}", signal);
95            #[cfg(feature = "extended-siginfo")]
96            let signal = signal.signal;
97
98            if signal != SIGTERM {
99                low_level::emulate_default_handler(signal).unwrap();
100            }
101
102            let _ = sender.send(signal);
103        }
104    });
105
106    Ok(receiver)
107}
108
109const MAIN_QUEUE_NAME: &str = "individuals-flow";
110
111pub fn get_queue_status(id: &str) -> Individual {
112    let mut out_indv = Individual::default();
113    if let Some(consumer_name) = id.strip_prefix("srv:queue-state-") {
114        let base_path: &str = &(DATA_BASE_PATH.to_owned() + "/queue");
115        if let Ok(mut c) = Consumer::new_with_mode(base_path, consumer_name, MAIN_QUEUE_NAME, Mode::Read) {
116            c.open(false);
117            c.get_info();
118            if c.queue.get_info_of_part(c.id, false).is_ok() {
119                out_indv.set_id(id);
120                out_indv.add_uri("rdf:type", "v-s:AppInfo");
121                // Исправленная строка
122                out_indv.add_datetime("v-s:created", Utc::now().timestamp());
123                out_indv.add_uri("srv:queue", &("srv:".to_owned() + consumer_name));
124                out_indv.add_integer("srv:total_count", c.queue.count_pushed as i64);
125                out_indv.add_integer("srv:current_count", c.count_popped as i64);
126            }
127        } else {
128            error!("fail open consumer {}", consumer_name);
129        }
130    }
131    out_indv
132}
133
134#[macro_export]
135macro_rules! init_module_log {
136    ($module_name:expr) => {{
137        use git_version::git_version;
138        use version::version;
139        init_log($module_name);
140        info!("{} {} {}", $module_name, version!(), git_version!());
141    }};
142}