v_common/module/
common.rs1use crossbeam_channel::{bounded, Receiver};
2use signal_hook::consts::signal::*;
3use signal_hook::consts::{SIGCONT, SIGTSTP};
4use std::collections::HashSet;
5use std::io::Error;
6use std::os::raw::c_int;
7use std::thread;
8#[cfg(feature = "extended-siginfo")]
9type Signals = signal_hook::iterator::SignalsInfo<signal_hook::iterator::exfiltrator::origin::WithOrigin>;
10use v_individual_model::onto::individual::Individual;
11use v_individual_model::onto::onto_impl::Onto;
12use v_individual_model::onto::onto_index::OntoIndex;
13use crate::storage::async_storage::{get_individual_from_db, AStorage};
14use v_storage::VStorage;
15use crate::v_api::common_type::ResultCode;
16use chrono::Utc;
17#[cfg(not(feature = "extended-siginfo"))]
18use signal_hook::iterator::Signals;
19use signal_hook::low_level;
20use v_queue::consumer::Consumer;
21use v_queue::record::Mode;
22
23pub const DATA_BASE_PATH: &str = "./data";
24
25pub async fn c_load_onto(storage: &AStorage, onto: &mut Onto) -> bool {
26 let onto_index = OntoIndex::load();
27
28 info!("load {} onto elements", onto_index.len());
29
30 for id in onto_index.data.keys() {
31 if let Ok((mut indv, res)) = get_individual_from_db(id, "", storage, None).await {
32 if res == ResultCode::Ok {
33 onto.update(&mut indv);
34 }
35 }
36 }
37
38 info!("add to hierarchy {} elements", onto.relations.len());
39
40 let keys: Vec<String> = onto.relations.keys().cloned().collect();
41
42 for el in keys.iter() {
43 let mut buf: HashSet<String> = HashSet::new();
44 onto.get_subs(el, &mut buf);
45 if !buf.is_empty() {
46 onto.update_subs(el, &mut buf);
47 }
49 }
50
51 info!("end update subs");
52
53 true
54}
55
56pub fn load_onto(storage: &mut VStorage, onto: &mut Onto) -> bool {
57 let onto_index = OntoIndex::load();
58
59 info!("load {} onto elements", onto_index.len());
60
61 for id in onto_index.data.keys() {
62 let mut indv: Individual = Individual::default();
63 if storage.get_individual(id, &mut indv).is_ok() {
64 onto.update(&mut indv);
65 }
66 }
67
68 info!("add to hierarchy {} elements", onto.relations.len());
69
70 let keys: Vec<String> = onto.relations.keys().cloned().collect();
71
72 for el in keys.iter() {
73 let mut buf: HashSet<String> = HashSet::new();
74 onto.get_subs(el, &mut buf);
75 if !buf.is_empty() {
76 onto.update_subs(el, &mut buf);
77 }
79 }
80
81 info!("end update subs");
82
83 true
84}
85
86const SIGNALS: &[c_int] = &[SIGTERM, SIGQUIT, SIGINT, SIGTSTP, SIGCONT];
87
88pub fn sys_sig_listener() -> Result<Receiver<i32>, Error> {
89 let (sender, receiver) = bounded(1);
90 thread::spawn(move || {
91 info!("Start system signal listener");
92 let mut sigs = Signals::new(SIGNALS).unwrap();
93 for signal in &mut sigs {
94 warn!("Received signal {:?}", signal);
95 #[cfg(feature = "extended-siginfo")]
96 let signal = signal.signal;
97
98 if signal != SIGTERM {
99 low_level::emulate_default_handler(signal).unwrap();
100 }
101
102 let _ = sender.send(signal);
103 }
104 });
105
106 Ok(receiver)
107}
108
109const MAIN_QUEUE_NAME: &str = "individuals-flow";
110
111pub fn get_queue_status(id: &str) -> Individual {
112 let mut out_indv = Individual::default();
113 if let Some(consumer_name) = id.strip_prefix("srv:queue-state-") {
114 let base_path: &str = &(DATA_BASE_PATH.to_owned() + "/queue");
115 if let Ok(mut c) = Consumer::new_with_mode(base_path, consumer_name, MAIN_QUEUE_NAME, Mode::Read) {
116 c.open(false);
117 c.get_info();
118 if c.queue.get_info_of_part(c.id, false).is_ok() {
119 out_indv.set_id(id);
120 out_indv.add_uri("rdf:type", "v-s:AppInfo");
121 out_indv.add_datetime("v-s:created", Utc::now().timestamp());
123 out_indv.add_uri("srv:queue", &("srv:".to_owned() + consumer_name));
124 out_indv.add_integer("srv:total_count", c.queue.count_pushed as i64);
125 out_indv.add_integer("srv:current_count", c.count_popped as i64);
126 }
127 } else {
128 error!("fail open consumer {}", consumer_name);
129 }
130 }
131 out_indv
132}
133
134#[macro_export]
135macro_rules! init_module_log {
136 ($module_name:expr) => {{
137 use git_version::git_version;
138 use version::version;
139 init_log($module_name);
140 info!("{} {} {}", $module_name, version!(), git_version!());
141 }};
142}