Skip to main content

v_common/module/
common.rs

1use crossbeam_channel::{bounded, Receiver};
2use signal_hook::consts::signal::*;
3use signal_hook::consts::{SIGCONT, SIGTSTP};
4use std::collections::HashSet;
5use std::io::Error;
6use std::os::raw::c_int;
7use std::thread;
8use signal_hook::iterator::Signals;
9use v_individual_model::onto::individual::Individual;
10use v_individual_model::onto::onto_impl::Onto;
11use v_individual_model::onto::onto_index::OntoIndex;
12use crate::storage::async_storage::{get_individual_from_db, AStorage};
13use v_storage::VStorage;
14use crate::v_api::common_type::ResultCode;
15use chrono::Utc;
16use signal_hook::low_level;
17use v_queue::consumer::Consumer;
18use v_queue::record::Mode;
19
20pub const DATA_BASE_PATH: &str = "./data";
21
22pub async fn c_load_onto(storage: &AStorage, onto: &mut Onto) -> bool {
23    let onto_index = OntoIndex::load();
24
25    info!("load {} onto elements", onto_index.len());
26
27    for id in onto_index.data.keys() {
28        if let Ok((mut indv, res)) = get_individual_from_db(id, "", storage, None).await {
29            if res == ResultCode::Ok {
30                onto.update(&mut indv);
31            }
32        }
33    }
34
35    info!("add to hierarchy {} elements", onto.relations.len());
36
37    let keys: Vec<String> = onto.relations.keys().cloned().collect();
38
39    for el in keys.iter() {
40        let mut buf: HashSet<String> = HashSet::new();
41        onto.get_subs(el, &mut buf);
42        if !buf.is_empty() {
43            onto.update_subs(el, &mut buf);
44            //info!("{}, subs={:?}", el, buf);
45        }
46    }
47
48    info!("end update subs");
49
50    true
51}
52
53pub fn load_onto(storage: &mut VStorage, onto: &mut Onto) -> bool {
54    let onto_index = OntoIndex::load();
55
56    info!("load {} onto elements", onto_index.len());
57
58    for id in onto_index.data.keys() {
59        let mut indv: Individual = Individual::default();
60        if storage.get_individual(id, &mut indv).is_ok() {
61            onto.update(&mut indv);
62        }
63    }
64
65    info!("add to hierarchy {} elements", onto.relations.len());
66
67    let keys: Vec<String> = onto.relations.keys().cloned().collect();
68
69    for el in keys.iter() {
70        let mut buf: HashSet<String> = HashSet::new();
71        onto.get_subs(el, &mut buf);
72        if !buf.is_empty() {
73            onto.update_subs(el, &mut buf);
74            //info!("{}, subs={:?}", el, buf);
75        }
76    }
77
78    info!("end update subs");
79
80    true
81}
82
83const SIGNALS: &[c_int] = &[SIGTERM, SIGQUIT, SIGINT, SIGTSTP, SIGCONT];
84
85pub fn sys_sig_listener() -> Result<Receiver<i32>, Error> {
86    let (sender, receiver) = bounded(1);
87    thread::spawn(move || {
88        info!("Start system signal listener");
89        let mut sigs = Signals::new(SIGNALS).unwrap();
90        for signal in &mut sigs {
91            warn!("Received signal {:?}", signal);
92
93            if signal != SIGTERM {
94                low_level::emulate_default_handler(signal).unwrap();
95            }
96
97            let _ = sender.send(signal);
98        }
99    });
100
101    Ok(receiver)
102}
103
104const MAIN_QUEUE_NAME: &str = "individuals-flow";
105
106pub fn get_queue_status(id: &str) -> Individual {
107    let mut out_indv = Individual::default();
108    if let Some(consumer_name) = id.strip_prefix("srv:queue-state-") {
109        let base_path: &str = &(DATA_BASE_PATH.to_owned() + "/queue");
110        if let Ok(mut c) = Consumer::new_with_mode(base_path, consumer_name, MAIN_QUEUE_NAME, Mode::Read) {
111            c.open(false);
112            c.get_info();
113            if c.queue.get_info_of_part(c.id, false).is_ok() {
114                out_indv.set_id(id);
115                out_indv.add_uri("rdf:type", "v-s:AppInfo");
116                // Исправленная строка
117                out_indv.add_datetime("v-s:created", Utc::now().timestamp());
118                out_indv.add_uri("srv:queue", &("srv:".to_owned() + consumer_name));
119                out_indv.add_integer("srv:total_count", c.queue.count_pushed as i64);
120                out_indv.add_integer("srv:current_count", c.count_popped as i64);
121            }
122        } else {
123            error!("fail open consumer {}", consumer_name);
124        }
125    }
126    out_indv
127}
128
129#[macro_export]
130macro_rules! init_module_log {
131    ($module_name:expr) => {{
132        use git_version::git_version;
133        use version::version;
134        init_log($module_name);
135        info!("{} {} {}", $module_name, version!(), git_version!());
136    }};
137}