1use crate::common::sys_sig_listener;
2use crate::info::ModuleInfo;
3use crate::ticket::Ticket;
4use crate::veda_backend::Backend;
5use chrono::{Local, NaiveDateTime, Utc};
6use crossbeam_channel::{select, tick, Receiver};
7use env_logger::Builder;
8use ini::Ini;
9use nng::options::protocol::pubsub::Subscribe;
10use nng::options::Options;
11use nng::options::RecvTimeout;
12use nng::{Protocol, Socket};
13use std::io::Write;
14use std::time::Duration;
15use std::time::Instant;
16use std::{env, thread, time};
17use uuid::Uuid;
18use v_api::app::ResultCode;
19use v_api::*;
20use v_onto::datatype::Lang;
21use v_onto::individual::*;
22use v_onto::individual2msgpack::to_msgpack;
23use v_onto::parser::*;
24use v_queue::{consumer::*, record::*};
25use v_storage::storage::*;
26
27#[derive(Debug)]
28#[repr(u8)]
29pub enum PrepareError {
30 Fatal = 101,
31 Recoverable = 102,
32}
33
34const TICKS_TO_UNIX_EPOCH: i64 = 62_135_596_800_000;
35
36pub struct Module {
37 pub(crate) queue_prepared_count: i64,
38 notify_channel_url: String,
39 pub(crate) is_ready_notify_channel: bool,
40 notify_channel_read_timeout: Option<u64>,
41 pub(crate) max_timeout_between_batches: Option<u64>,
42 pub(crate) min_batch_size_to_cancel_timeout: Option<u32>,
43 pub max_batch_size: Option<u32>,
44 pub(crate) subsystem_id: Option<i64>,
45 pub(crate) syssig_ch: Option<Receiver<i32>>,
46 pub(crate) name: String,
47 onto_types: Vec<String>,
48}
49
50impl Default for Module {
51 fn default() -> Self {
52 Module::create(None, "")
53 }
54}
55
56impl Module {
57 pub fn new_with_name(name: &str) -> Self {
58 Module::create(None, name)
59 }
60
61 pub fn create(module_id: Option<i64>, module_name: &str) -> Self {
62 let args: Vec<String> = env::args().collect();
63
64 let conf = Ini::load_from_file("veda.properties").expect("fail load veda.properties file");
65 let section = conf.section(None::<String>).expect("fail parse veda.properties");
66
67 let mut notify_channel_url = String::default();
68 let mut max_timeout_between_batches = None;
69 let mut min_batch_size_to_cancel_timeout = None;
70 let mut max_batch_size = None;
71 let mut notify_channel_read_timeout = None;
72
73 for el in args.iter() {
74 if el.starts_with("--max_timeout_between_batches") {
75 let p: Vec<&str> = el.split('=').collect();
76 if let Ok(v) = p[1].parse::<u64>() {
77 max_timeout_between_batches = Some(v);
78 info!("use {} = {} ms", p[0], v);
79 }
80 } else if el.starts_with("--min_batch_size_to_cancel_timeout") {
81 let p: Vec<&str> = el.split('=').collect();
82 if let Ok(v) = p[1].parse::<u32>() {
83 min_batch_size_to_cancel_timeout = Some(v);
84 info!("use {} = {}", p[0], v);
85 }
86 } else if el.starts_with("--max_batch_size") {
87 let p: Vec<&str> = el.split('=').collect();
88 if let Ok(v) = p[1].parse::<u32>() {
89 max_batch_size = Some(v);
90 info!("use {} = {}", p[0], v);
91 }
92 } else if el.starts_with("--notify_channel_read_timeout") {
93 let p: Vec<&str> = el.split('=').collect();
94 if let Ok(v) = p[1].parse::<u64>() {
95 notify_channel_read_timeout = Some(v);
96 info!("use {} = {} ms", p[0], v);
97 }
98 } else if el.starts_with("--notify_channel_url") {
99 let p: Vec<&str> = el.split('=').collect();
100 notify_channel_url = p[1].to_owned();
101 }
102 }
103
104 if notify_channel_url.is_empty() {
105 if let Some(s) = section.get("notify_channel_url") {
106 notify_channel_url = s.to_owned()
107 }
108 }
109
110 let onto_types = vec![
111 "rdfs:Class",
112 "owl:Class",
113 "rdfs:Datatype",
114 "owl:Ontology",
115 "rdf:Property",
116 "owl:DatatypeProperty",
117 "owl:ObjectProperty",
118 "owl:OntologyProperty",
119 "owl:AnnotationProperty",
120 "v-ui:PropertySpecification",
121 "v-ui:DatatypePropertySpecification",
122 "v-ui:ObjectPropertySpecification",
123 "v-ui:TemplateSpecification",
124 "v-ui:ClassModel",
125 ];
126
127 Module {
128 queue_prepared_count: 0,
129 notify_channel_url,
130 is_ready_notify_channel: false,
131 max_timeout_between_batches,
132 min_batch_size_to_cancel_timeout,
133 max_batch_size,
134 subsystem_id: module_id,
135 notify_channel_read_timeout,
136 syssig_ch: None,
137 name: module_name.to_owned(),
138 onto_types: onto_types.iter().map(|x| x.to_string()).collect(),
139 }
140 }
141
142 pub fn new() -> Self {
143 Module::create(None, "")
144 }
145
146 pub fn get_property(param: &str) -> Option<String> {
147 let args: Vec<String> = env::args().collect();
148 for el in args.iter() {
149 if el.starts_with(&format!("--{}", param)) {
150 let p: Vec<&str> = el.split('=').collect();
151 if p.len() == 2 {
152 let v = p[1].trim();
153 info!("use arg --{}={}", param, v);
154 return Some(p[1].to_owned());
155 }
156 }
157 }
158
159 let conf = Ini::load_from_file("veda.properties").expect("fail load veda.properties file");
160
161 let section = conf.section(None::<String>).expect("fail parse veda.properties");
162 if let Some(v) = section.get(param) {
163 let v = v.trim();
164 info!("use param {}={}", param, v);
165 return Some(v.to_string());
166 }
167
168 error!("param [{}] not found", param);
169 None
170 }
171
172 pub fn is_content_onto(&self, cmd: IndvOp, new_state: &mut Individual, prev_state: &mut Individual) -> bool {
173 if cmd != IndvOp::Remove {
174 if new_state.any_exists_v("rdf:type", &self.onto_types) {
175 return true;
176 }
177 } else if prev_state.any_exists_v("rdf:type", &self.onto_types) {
178 return true;
179 }
180 false
181 }
182
183 pub fn get_sys_ticket_id_from_db(storage: &mut VStorage) -> Result<String, i32> {
184 let mut indv = Individual::default();
185 if storage.get_individual_from_db(StorageId::Tickets, "systicket", &mut indv) {
186 if let Some(c) = indv.get_first_literal("v-s:resource") {
187 return Ok(c);
188 }
189 }
190 Err(-1)
191 }
192
193 pub(crate) fn connect_to_notify_channel(&mut self) -> Option<Socket> {
194 if !self.is_ready_notify_channel && !self.notify_channel_url.is_empty() {
195 let soc = Socket::new(Protocol::Sub0).unwrap();
196
197 let timeout = if let Some(t) = self.notify_channel_read_timeout {
198 t
199 } else {
200 1000
201 };
202
203 if let Err(e) = soc.set_opt::<RecvTimeout>(Some(Duration::from_millis(timeout))) {
204 error!("fail set timeout, {} err={}", self.notify_channel_url, e);
205 return None;
206 }
207
208 if let Err(e) = soc.dial(&self.notify_channel_url) {
209 error!("fail connect to, {} err={}", self.notify_channel_url, e);
210 return None;
211 } else {
212 let all_topics = vec![];
213 if let Err(e) = soc.set_opt::<Subscribe>(all_topics) {
214 error!("fail subscribe, {} err={}", self.notify_channel_url, e);
215 soc.close();
216 self.is_ready_notify_channel = false;
217 return None;
218 } else {
219 info!("success subscribe on queue changes: {}", self.notify_channel_url);
220 self.is_ready_notify_channel = true;
221 return Some(soc);
222 }
223 }
224 }
225 None
226 }
227
228 pub fn listen_queue_raw<T>(
229 &mut self,
230 queue_consumer: &mut Consumer,
231 module_context: &mut T,
232 before_batch: &mut fn(&mut Backend, &mut T, batch_size: u32) -> Option<u32>,
233 prepare: &mut fn(&mut Backend, &mut T, &RawObj, &Consumer) -> Result<bool, PrepareError>,
234 after_batch: &mut fn(&mut Backend, &mut T, prepared_batch_size: u32) -> Result<bool, PrepareError>,
235 heartbeat: &mut fn(&mut Backend, &mut T) -> Result<(), PrepareError>,
236 backend: &mut Backend,
237 ) {
238 self.listen_queue_comb(queue_consumer, module_context, before_batch, Some(prepare), None, after_batch, heartbeat, backend)
239 }
240
241 pub fn listen_queue<T>(
242 &mut self,
243 queue_consumer: &mut Consumer,
244 module_context: &mut T,
245 before_batch: &mut fn(&mut Backend, &mut T, batch_size: u32) -> Option<u32>,
246 prepare: &mut fn(&mut Backend, &mut T, &mut Individual, &Consumer) -> Result<bool, PrepareError>,
247 after_batch: &mut fn(&mut Backend, &mut T, prepared_batch_size: u32) -> Result<bool, PrepareError>,
248 heartbeat: &mut fn(&mut Backend, &mut T) -> Result<(), PrepareError>,
249 backend: &mut Backend,
250 ) {
251 self.listen_queue_comb(queue_consumer, module_context, before_batch, None, Some(prepare), after_batch, heartbeat, backend)
252 }
253
254 fn listen_queue_comb<T>(
255 &mut self,
256 queue_consumer: &mut Consumer,
257 module_context: &mut T,
258 before_batch: &mut fn(&mut Backend, &mut T, batch_size: u32) -> Option<u32>,
259 prepare_raw: Option<&mut fn(&mut Backend, &mut T, &RawObj, &Consumer) -> Result<bool, PrepareError>>,
260 prepare_indv: Option<&mut fn(&mut Backend, &mut T, &mut Individual, &Consumer) -> Result<bool, PrepareError>>,
261 after_batch: &mut fn(&mut Backend, &mut T, prepared_batch_size: u32) -> Result<bool, PrepareError>,
262 heartbeat: &mut fn(&mut Backend, &mut T) -> Result<(), PrepareError>,
263 backend: &mut Backend,
264 ) {
265 if let Ok(ch) = sys_sig_listener() {
266 self.syssig_ch = Some(ch);
267 }
268
269 let mut soc = Socket::new(Protocol::Sub0).unwrap();
270 let mut count_timeout_error = 0;
271
272 let mut prev_batch_time = Instant::now();
273 let update = tick(Duration::from_millis(1));
274 loop {
275 if let Some(qq) = &self.syssig_ch {
276 select! {
277 recv(update) -> _ => {
278 }
279 recv(qq) -> _ => {
280 info!("Exit");
281 std::process::exit (exitcode::OK);
282 }
284 }
285 }
286
287 match heartbeat(backend, module_context) {
288 Err(e) => {
289 if let PrepareError::Fatal = e {
290 error!("heartbeat: found fatal error, stop listen queue");
291 break;
292 }
293 }
294 _ => {}
295 }
296
297 if let Some(s) = self.connect_to_notify_channel() {
298 soc = s;
299 }
300
301 if let Err(e) = queue_consumer.queue.get_info_of_part(queue_consumer.id, true) {
303 error!("{} get_info_of_part {}: {}", self.queue_prepared_count, queue_consumer.id, e.as_str());
304 continue;
305 }
306
307 let size_batch = queue_consumer.get_batch_size();
308
309 let mut max_size_batch = size_batch;
310 if let Some(m) = self.max_batch_size {
311 max_size_batch = m;
312 }
313
314 if size_batch > 0 {
315 debug!("queue: batch size={}", size_batch);
316 if let Some(new_size) = before_batch(backend, module_context, size_batch) {
317 max_size_batch = new_size;
318 }
319 }
320
321 let mut prepared_batch_size = 0;
322 for _it in 0..max_size_batch {
323 if !queue_consumer.pop_header() {
325 break;
326 }
327
328 let mut raw = RawObj::new(vec![0; (queue_consumer.header.msg_length) as usize]);
329
330 if let Err(e) = queue_consumer.pop_body(&mut raw.data) {
332 match e {
333 ErrorQueue::FailReadTailMessage => {
334 break;
335 }
336 ErrorQueue::InvalidChecksum => {
337 error!("[module] consumer:pop_body: invalid CRC, attempt seek next record");
338 queue_consumer.seek_next_pos();
339 break;
340 }
341 _ => {
342 error!("{} get msg from queue: {}", self.queue_prepared_count, e.as_str());
343 break;
344 }
345 }
346 }
347
348 let mut need_commit = true;
349
350 if let Some(&mut f) = prepare_raw {
351 match f(backend, module_context, &raw, queue_consumer) {
352 Err(e) => {
353 if let PrepareError::Fatal = e {
354 warn!("prepare: found fatal error, stop listen queue");
355 return;
356 }
357 }
358 Ok(b) => {
359 need_commit = b;
360 }
361 }
362 }
363
364 if let Some(&mut f) = prepare_indv {
365 let mut queue_element = Individual::new_raw(raw);
366 if parse_raw(&mut queue_element).is_ok() {
367 let mut is_processed = true;
368 if let Some(assigned_subsystems) = queue_element.get_first_integer("assigned_subsystems") {
369 if assigned_subsystems > 0 {
370 if let Some(my_subsystem_id) = self.subsystem_id {
371 if assigned_subsystems & my_subsystem_id == 0 {
372 is_processed = false;
373 }
374 } else {
375 is_processed = false;
376 }
377 }
378 }
379
380 if is_processed {
381 match f(backend, module_context, &mut queue_element, queue_consumer) {
382 Err(e) => {
383 if let PrepareError::Fatal = e {
384 warn!("prepare: found fatal error, stop listen queue");
385 return;
386 }
387 }
388 Ok(b) => {
389 need_commit = b;
390 }
391 }
392 }
393 }
394 }
395
396 queue_consumer.next(need_commit);
397
398 self.queue_prepared_count += 1;
399
400 if self.queue_prepared_count % 1000 == 0 {
401 info!("get from queue, count: {}", self.queue_prepared_count);
402 }
403 prepared_batch_size += 1;
404 }
405
406 if size_batch > 0 {
407 match after_batch(backend, module_context, prepared_batch_size) {
408 Ok(b) => {
409 if b {
410 queue_consumer.commit();
411 }
412 }
413 Err(e) => {
414 if let PrepareError::Fatal = e {
415 warn!("after_batch: found fatal error, stop listen queue");
416 return;
417 }
418 }
419 }
420 }
421
422 if prepared_batch_size == size_batch {
423 let wmsg = soc.recv();
424 if let Err(e) = wmsg {
425 debug!("fail recv from queue notify channel, err={:?}", e);
426
427 if count_timeout_error > 0 && size_batch > 0 {
428 warn!("queue changed but we not received notify message, need reconnect...");
429 self.is_ready_notify_channel = false;
430 count_timeout_error += 1;
431 }
432 } else {
433 count_timeout_error = 0;
434 }
435 }
436
437 if let Some(t) = self.max_timeout_between_batches {
438 let delta = prev_batch_time.elapsed().as_millis() as u64;
439 if let Some(c) = self.min_batch_size_to_cancel_timeout {
440 if prepared_batch_size < c && delta < t {
441 thread::sleep(time::Duration::from_millis(t - delta));
442 info!("sleep {} ms", t - delta);
443 }
444 } else if delta < t {
445 thread::sleep(time::Duration::from_millis(t - delta));
446 info!("sleep {} ms", t - delta);
447 }
448 }
449
450 prev_batch_time = Instant::now();
451 }
452 }
453}
454
455pub fn get_inner_binobj_as_individual<'a>(queue_element: &'a mut Individual, field_name: &str, new_indv: &'a mut Individual) -> bool {
456 let binobj = queue_element.get_first_binobj(field_name);
457 if binobj.is_some() {
458 new_indv.set_raw(&binobj.unwrap_or_default());
459 if parse_raw(new_indv).is_ok() {
460 return true;
461 }
462 }
463 false
464}
465
466pub fn get_cmd(queue_element: &mut Individual) -> Option<IndvOp> {
467 let wcmd = queue_element.get_first_integer("cmd");
468 wcmd?;
469
470 Some(IndvOp::from_i64(wcmd.unwrap_or_default()))
471}
472
473pub fn init_log(module_name: &str) {
474 init_log_with_filter(module_name, None)
475}
476
477pub fn init_log_with_filter(module_name: &str, filter: Option<&str>) {
478 let var_log_name = module_name.to_owned() + "_LOG";
479 match std::env::var_os(var_log_name.to_owned()) {
480 Some(val) => println!("use env var: {}: {:?}", var_log_name, val.to_str()),
481 None => std::env::set_var(var_log_name.to_owned(), "info"),
482 }
483
484 let filters_str = if let Some(f) = filter {
485 f.to_owned()
486 } else {
487 env::var(var_log_name).unwrap_or_default()
488 };
489
490 Builder::new()
491 .format(|buf, record| writeln!(buf, "{} [{}] - {}", Local::now().format("%Y-%m-%dT%H:%M:%S%.3f"), record.level(), record.args()))
492 .parse_filters(&filters_str)
493 .init()
494}
495
496pub fn create_new_ticket(login: &str, user_id: &str, duration: i64, ticket: &mut Ticket, storage: &mut VStorage) {
497 let mut ticket_indv = Individual::default();
498
499 ticket.result = ResultCode::FailStore;
500 ticket_indv.add_string("rdf:type", "ticket:ticket", Lang::NONE);
501
502 if !ticket.id.is_empty() && !ticket.id.is_empty() {
503 ticket_indv.set_id(&ticket.id);
504 } else {
505 ticket_indv.set_id(&Uuid::new_v4().to_hyphenated().to_string());
506 }
507
508 ticket_indv.add_string("ticket:login", login, Lang::NONE);
509 ticket_indv.add_string("ticket:accessor", user_id, Lang::NONE);
510
511 let now = Utc::now();
512 let start_time_str = format!("{:?}", now.naive_utc());
513
514 if start_time_str.len() > 28 {
515 ticket_indv.add_string("ticket:when", &start_time_str[0..28], Lang::NONE);
516 } else {
517 ticket_indv.add_string("ticket:when", &start_time_str, Lang::NONE);
518 }
519
520 ticket_indv.add_string("ticket:duration", &duration.to_string(), Lang::NONE);
521
522 let mut raw1: Vec<u8> = Vec::new();
523 if to_msgpack(&ticket_indv, &mut raw1).is_ok() && storage.put_kv_raw(StorageId::Tickets, ticket_indv.get_id(), raw1) {
524 ticket.update_from_individual(&mut ticket_indv);
525 ticket.result = ResultCode::Ok;
526 ticket.start_time = (TICKS_TO_UNIX_EPOCH + now.timestamp_millis()) * 10_000;
527 ticket.end_time = ticket.start_time + duration as i64 * 10_000_000;
528
529 let end_time_str = format!("{:?}", NaiveDateTime::from_timestamp((ticket.end_time / 10_000 - TICKS_TO_UNIX_EPOCH) / 1_000, 0));
530 info!("create new ticket {}, login={}, user={}, start={}, end={}", ticket.id, ticket.user_login, ticket.user_uri, start_time_str, end_time_str);
531 } else {
532 error!("fail store ticket {:?}", ticket)
533 }
534}
535
536pub fn create_sys_ticket(storage: &mut VStorage) -> Ticket {
537 let mut ticket = Ticket::default();
538 create_new_ticket("veda", "cfg:VedaSystem", 90_000_000, &mut ticket, storage);
539
540 if ticket.result == ResultCode::Ok {
541 let mut sys_ticket_link = Individual::default();
542 sys_ticket_link.set_id("systicket");
543 sys_ticket_link.add_uri("rdf:type", "rdfs:Resource");
544 sys_ticket_link.add_uri("v-s:resource", &ticket.id);
545 let mut raw1: Vec<u8> = Vec::new();
546 if to_msgpack(&sys_ticket_link, &mut raw1).is_ok() && storage.put_kv_raw(StorageId::Tickets, sys_ticket_link.get_id(), raw1) {
547 return ticket;
548 } else {
549 error!("fail store system ticket link")
550 }
551 } else {
552 error!("fail create sys ticket")
553 }
554
555 ticket
556}
557
558pub fn get_info_of_module(module_name: &str) -> Option<(i64, i64)> {
559 let module_info = ModuleInfo::new("./data", module_name, false);
560 if module_info.is_err() {
561 error!("fail open info of [{}], err={:?}", module_name, module_info.err());
562 return None;
563 }
564
565 let mut info = module_info.unwrap();
566 info.read_info()
567}
568
569pub fn wait_load_ontology() -> i64 {
570 wait_module("input-onto", 1)
571}
572
573pub fn wait_module(module_name: &str, wait_op_id: i64) -> i64 {
574 if wait_op_id < 0 {
575 error!("wait module [{}] to complete op_id={}", module_name, wait_op_id);
576 return -1;
577 }
578
579 info!("wait module [{}] to complete op_id={}", module_name, wait_op_id);
580 loop {
581 let module_info = ModuleInfo::new("./data", module_name, false);
582 if module_info.is_err() {
583 error!("fail open info of [{}], err={:?}", module_name, module_info.err());
584 thread::sleep(time::Duration::from_millis(300));
585 continue;
586 }
587
588 let mut info = module_info.unwrap();
589 loop {
590 if let Some((_, committed)) = info.read_info() {
591 if committed >= wait_op_id {
592 info!("wait module [{}] to complete op_id={}, found commited_op_id={}", module_name, wait_op_id, committed);
593 return committed;
594 }
595 } else {
596 error!("fail read info for module [{}]", module_name);
597 }
599 thread::sleep(time::Duration::from_millis(300));
600 }
601
602 }
604
605 }