v_common/module/
common.rs1use std::collections::HashSet;
2use v_individual_model::onto::individual::Individual;
3use v_individual_model::onto::onto_impl::Onto;
4use v_individual_model::onto::onto_index::OntoIndex;
5use crate::storage::async_storage::{get_individual_from_db, AStorage};
6use v_storage::VStorage;
7use crate::v_api::common_type::ResultCode;
8use chrono::Utc;
9use v_queue::consumer::Consumer;
10use v_queue::record::Mode;
11
12pub const DATA_BASE_PATH: &str = "./data";
13
14pub async fn c_load_onto(storage: &AStorage, onto: &mut Onto) -> bool {
15 let onto_index = OntoIndex::load();
16
17 info!("load {} onto elements", onto_index.len());
18
19 for id in onto_index.data.keys() {
20 if let Ok((mut indv, res)) = get_individual_from_db(id, "", storage, None).await {
21 if res == ResultCode::Ok {
22 onto.update(&mut indv);
23 }
24 }
25 }
26
27 info!("add to hierarchy {} elements", onto.relations.len());
28
29 let keys: Vec<String> = onto.relations.keys().cloned().collect();
30
31 for el in keys.iter() {
32 let mut buf: HashSet<String> = HashSet::new();
33 onto.get_subs(el, &mut buf);
34 if !buf.is_empty() {
35 onto.update_subs(el, &mut buf);
36 }
38 }
39
40 info!("end update subs");
41
42 true
43}
44
45pub fn load_onto(storage: &mut VStorage, onto: &mut Onto) -> bool {
46 let onto_index = OntoIndex::load();
47
48 info!("load {} onto elements", onto_index.len());
49
50 for id in onto_index.data.keys() {
51 let mut indv: Individual = Individual::default();
52 if storage.get_individual(id, &mut indv).is_ok() {
53 onto.update(&mut indv);
54 }
55 }
56
57 info!("add to hierarchy {} elements", onto.relations.len());
58
59 let keys: Vec<String> = onto.relations.keys().cloned().collect();
60
61 for el in keys.iter() {
62 let mut buf: HashSet<String> = HashSet::new();
63 onto.get_subs(el, &mut buf);
64 if !buf.is_empty() {
65 onto.update_subs(el, &mut buf);
66 }
68 }
69
70 info!("end update subs");
71
72 true
73}
74
75pub use v_module_queue::sys_sig_listener;
76
77const MAIN_QUEUE_NAME: &str = "individuals-flow";
78
79pub fn get_queue_status(id: &str) -> Individual {
80 let mut out_indv = Individual::default();
81 if let Some(consumer_name) = id.strip_prefix("srv:queue-state-") {
82 let base_path: &str = &(DATA_BASE_PATH.to_owned() + "/queue");
83 if let Ok(mut c) = Consumer::new_with_mode(base_path, consumer_name, MAIN_QUEUE_NAME, Mode::Read) {
84 c.open(false);
85 c.get_info();
86 if c.queue.get_info_of_part(c.id, false).is_ok() {
87 out_indv.set_id(id);
88 out_indv.add_uri("rdf:type", "v-s:AppInfo");
89 out_indv.add_datetime("v-s:created", Utc::now().timestamp());
91 out_indv.add_uri("srv:queue", &("srv:".to_owned() + consumer_name));
92 out_indv.add_integer("srv:total_count", c.queue.count_pushed as i64);
93 out_indv.add_integer("srv:current_count", c.count_popped as i64);
94 }
95 } else {
96 error!("fail open consumer {}", consumer_name);
97 }
98 }
99 out_indv
100}
101
102#[macro_export]
103macro_rules! init_module_log {
104 ($module_name:expr) => {{
105 use git_version::git_version;
106 use version::version;
107 init_log($module_name);
108 info!("{} {} {}", $module_name, version!(), git_version!());
109 }};
110}