Skip to main content

v_module_queue/
module.rs

1use crate::info::ModuleInfo;
2use crate::signals::sys_sig_listener;
3use crate::api::IndvOp;
4use chrono::Local;
5use crossbeam_channel::{select, tick, Receiver};
6use env_logger::Builder;
7use ini::Ini;
8use nng::options::protocol::pubsub::Subscribe;
9use nng::options::Options;
10use nng::options::RecvTimeout;
11use nng::{Protocol, Socket};
12use std::io::Write;
13use std::str::FromStr;
14use std::time::Duration;
15use std::time::Instant;
16use std::{env, thread, time};
17use v_individual_model::onto::individual::{Individual, RawObj};
18use v_individual_model::onto::parser::parse_raw;
19use v_queue::consumer::*;
20use v_queue::record::*;
21
22#[derive(Debug)]
23#[repr(u8)]
24pub enum PrepareError {
25    Fatal = 101,
26    Recoverable = 102,
27}
28
29const NOTIFY_CHANNEL_RECONNECT_TIMEOUT: u64 = 300;
30
31pub struct Module {
32    pub(crate) queue_prepared_count: i64,
33    notify_channel_url: String,
34    pub(crate) is_ready_notify_channel: bool,
35    notify_channel_read_timeout: Option<u64>,
36    pub(crate) max_timeout_between_batches: Option<u64>,
37    pub(crate) min_batch_size_to_cancel_timeout: Option<u32>,
38    pub max_batch_size: Option<u32>,
39    pub(crate) subsystem_id: Option<i64>,
40    pub(crate) syssig_ch: Option<Receiver<i32>>,
41    pub name: String,
42    pub onto_types: Vec<String>,
43}
44
45impl Default for Module {
46    fn default() -> Self {
47        Module::create(None, "")
48    }
49}
50
51impl Module {
52    pub fn new_with_name(name: &str) -> Self {
53        Module::create(None, name)
54    }
55
56    pub fn create(module_id: Option<i64>, module_name: &str) -> Self {
57        let args: Vec<String> = env::args().collect();
58
59        let mut notify_channel_url = String::default();
60        let mut max_timeout_between_batches = None;
61        let mut min_batch_size_to_cancel_timeout = None;
62        let mut max_batch_size = None;
63        let mut notify_channel_read_timeout = None;
64
65        for el in args.iter() {
66            if el.starts_with("--max_timeout_between_batches") {
67                let p: Vec<&str> = el.split('=').collect();
68                if let Ok(v) = p[1].parse::<u64>() {
69                    max_timeout_between_batches = Some(v);
70                    info!("use {} = {} ms", p[0], v);
71                }
72            } else if el.starts_with("--min_batch_size_to_cancel_timeout") {
73                let p: Vec<&str> = el.split('=').collect();
74                if let Ok(v) = p[1].parse::<u32>() {
75                    min_batch_size_to_cancel_timeout = Some(v);
76                    info!("use {} = {}", p[0], v);
77                }
78            } else if el.starts_with("--max_batch_size") {
79                let p: Vec<&str> = el.split('=').collect();
80                if let Ok(v) = p[1].parse::<u32>() {
81                    max_batch_size = Some(v);
82                    println!("use {} = {}", p[0], v);
83                }
84            } else if el.starts_with("--notify_channel_read_timeout") {
85                let p: Vec<&str> = el.split('=').collect();
86                if let Ok(v) = p[1].parse::<u64>() {
87                    notify_channel_read_timeout = Some(v);
88                    info!("use {} = {} ms", p[0], v);
89                }
90            } else if el.starts_with("--notify_channel_url") {
91                let p: Vec<&str> = el.split('=').collect();
92                notify_channel_url = p[1].to_owned();
93            }
94        }
95
96        if notify_channel_url.is_empty() {
97            if let Some(s) = Module::get_property("notify_channel_url") {
98                notify_channel_url = s;
99            }
100        }
101
102        let onto_types = vec![
103            "rdfs:Class",
104            "owl:Class",
105            "rdfs:Datatype",
106            "owl:Ontology",
107            "rdf:Property",
108            "owl:DatatypeProperty",
109            "owl:ObjectProperty",
110            "owl:OntologyProperty",
111            "owl:AnnotationProperty",
112            "v-ui:PropertySpecification",
113            "v-ui:DatatypePropertySpecification",
114            "v-ui:ObjectPropertySpecification",
115            "v-ui:TemplateSpecification",
116            "v-ui:ClassModel",
117        ]
118        .into_iter()
119        .map(|s| s.to_string())
120        .collect();
121
122        Module {
123            queue_prepared_count: 0,
124            notify_channel_url,
125            is_ready_notify_channel: false,
126            max_timeout_between_batches,
127            min_batch_size_to_cancel_timeout,
128            max_batch_size,
129            subsystem_id: module_id,
130            notify_channel_read_timeout,
131            syssig_ch: None,
132            name: module_name.to_owned(),
133            onto_types,
134        }
135    }
136
137    pub fn new() -> Self {
138        Module::create(None, "")
139    }
140
141    pub fn is_content_onto(
142        &self,
143        cmd: IndvOp,
144        new_state: &mut Individual,
145        prev_state: &mut Individual,
146    ) -> bool {
147        if cmd != IndvOp::Remove {
148            if new_state.any_exists_v("rdf:type", &self.onto_types) {
149                return true;
150            }
151        } else if prev_state.any_exists_v("rdf:type", &self.onto_types) {
152            return true;
153        }
154        false
155    }
156
157    pub fn get_property<T: FromStr>(in_param: &str) -> Option<T> {
158        let conf = Ini::load_from_file("veda.properties").expect("fail load veda.properties file");
159
160        let aliases = conf
161            .section(Some("alias"))
162            .expect("fail parse veda.properties, section [alias]");
163
164        let args: Vec<String> = env::args().collect();
165
166        let params = [in_param.replace('_', "-"), in_param.replace('-', "_")];
167
168        for el in args.iter() {
169            for param in &params {
170                if el.starts_with(&format!("--{}", param)) {
171                    let p: Vec<&str> = el.split('=').collect();
172
173                    if p.len() == 2 {
174                        let v = p[1].trim();
175                        let val = if let Some(a) = aliases.get(v) {
176                            info!("use arg --{}={}, alias={}", param, a, v);
177                            a
178                        } else {
179                            info!("use arg --{}={}", param, v);
180                            v
181                        };
182
183                        return val.parse().ok();
184                    }
185                }
186            }
187        }
188
189        let section = conf.section(None::<String>).expect("fail parse veda.properties");
190
191        if let Some(v) = section.get(in_param) {
192            let mut val = v.trim().to_owned();
193
194            if val.starts_with('$') {
195                if let Ok(val4var) = env::var(val.strip_prefix('$').unwrap_or_default()) {
196                    info!("get env variable [{}]", val);
197                    val = val4var;
198                } else {
199                    info!("not found env variable {}", val);
200                    return None;
201                }
202            }
203
204            let res = if let Some(a) = aliases.get(&val) {
205                info!("use param [{}]={}, alias={}", in_param, a, val);
206                a
207            } else {
208                info!("use param [{}]={}", in_param, val);
209                &val
210            };
211
212            return res.parse().ok();
213        }
214
215        error!("param [{}] not found", in_param);
216        None
217    }
218
219    pub fn connect_to_notify_channel(&mut self) -> Option<Socket> {
220        if !self.is_ready_notify_channel && !self.notify_channel_url.is_empty() {
221            let soc = Socket::new(Protocol::Sub0).unwrap();
222
223            let timeout = if let Some(t) = self.notify_channel_read_timeout {
224                t
225            } else {
226                1000
227            };
228
229            if let Err(e) = soc.set_opt::<RecvTimeout>(Some(Duration::from_millis(timeout))) {
230                error!("fail set timeout, {} err={}", self.notify_channel_url, e);
231                return None;
232            }
233
234            if let Err(e) = soc.dial(&self.notify_channel_url) {
235                error!("fail connect to, {} err={}", self.notify_channel_url, e);
236                return None;
237            } else {
238                let all_topics = vec![];
239                if let Err(e) = soc.set_opt::<Subscribe>(all_topics) {
240                    error!("fail subscribe, {} err={}", self.notify_channel_url, e);
241                    soc.close();
242                    self.is_ready_notify_channel = false;
243                    return None;
244                } else {
245                    info!(
246                        "success subscribe on queue changes: {}",
247                        self.notify_channel_url
248                    );
249                    self.is_ready_notify_channel = true;
250                    return Some(soc);
251                }
252            }
253        }
254        None
255    }
256
257    pub fn listen_queue_raw<T, B>(
258        &mut self,
259        queue_consumer: &mut Consumer,
260        module_context: &mut T,
261        before_batch: &mut fn(&mut B, &mut T, batch_size: u32) -> Option<u32>,
262        prepare: &mut fn(&mut B, &mut T, &RawObj, &Consumer) -> Result<bool, PrepareError>,
263        after_batch: &mut fn(&mut B, &mut T, prepared_batch_size: u32) -> Result<bool, PrepareError>,
264        heartbeat: &mut fn(&mut B, &mut T) -> Result<(), PrepareError>,
265        backend: &mut B,
266    ) {
267        self.listen_queue_comb(
268            queue_consumer,
269            module_context,
270            before_batch,
271            Some(prepare),
272            None,
273            after_batch,
274            heartbeat,
275            backend,
276        )
277    }
278
279    pub fn listen_queue<T, B>(
280        &mut self,
281        queue_consumer: &mut Consumer,
282        module_context: &mut T,
283        before_batch: &mut fn(&mut B, &mut T, batch_size: u32) -> Option<u32>,
284        prepare: &mut fn(&mut B, &mut T, &mut Individual, &Consumer) -> Result<bool, PrepareError>,
285        after_batch: &mut fn(&mut B, &mut T, prepared_batch_size: u32) -> Result<bool, PrepareError>,
286        heartbeat: &mut fn(&mut B, &mut T) -> Result<(), PrepareError>,
287        backend: &mut B,
288    ) {
289        self.listen_queue_comb(
290            queue_consumer,
291            module_context,
292            before_batch,
293            None,
294            Some(prepare),
295            after_batch,
296            heartbeat,
297            backend,
298        )
299    }
300
301    fn listen_queue_comb<T, B>(
302        &mut self,
303        queue_consumer: &mut Consumer,
304        module_context: &mut T,
305        before_batch: &mut fn(&mut B, &mut T, batch_size: u32) -> Option<u32>,
306        prepare_raw: Option<&mut fn(&mut B, &mut T, &RawObj, &Consumer) -> Result<bool, PrepareError>>,
307        prepare_indv: Option<
308            &mut fn(&mut B, &mut T, &mut Individual, &Consumer) -> Result<bool, PrepareError>,
309        >,
310        after_batch: &mut fn(&mut B, &mut T, prepared_batch_size: u32) -> Result<bool, PrepareError>,
311        heartbeat: &mut fn(&mut B, &mut T) -> Result<(), PrepareError>,
312        backend: &mut B,
313    ) {
314        if let Ok(ch) = sys_sig_listener() {
315            self.syssig_ch = Some(ch);
316        }
317
318        let mut soc = None;
319        let mut count_timeout_error = 0;
320
321        let mut prev_batch_time = Instant::now();
322        let update = tick(Duration::from_millis(1));
323        loop {
324            if let Some(qq) = &self.syssig_ch {
325                select! {
326                    recv(update) -> _ => {
327                    }
328                    recv(qq) -> _ => {
329                        info!("queue {}/{}, part:{}, pos:{}", queue_consumer.queue.base_path, queue_consumer.name, queue_consumer.id, queue_consumer.count_popped);
330                        info!("Exit");
331                        std::process::exit(exitcode::OK);
332                    }
333                }
334            }
335
336            if let Err(PrepareError::Fatal) = heartbeat(backend, module_context) {
337                error!("heartbeat: found fatal error, stop listen queue");
338                break;
339            }
340
341            if soc.is_none() {
342                soc = self.connect_to_notify_channel();
343                if soc.is_none() {
344                    thread::sleep(time::Duration::from_millis(NOTIFY_CHANNEL_RECONNECT_TIMEOUT));
345                    info!("sleep {} ms", NOTIFY_CHANNEL_RECONNECT_TIMEOUT);
346                }
347            }
348
349            if let Err(e) = queue_consumer.queue.get_info_of_part(queue_consumer.id, true) {
350                error!(
351                    "{} get_info_of_part {}: {}",
352                    self.queue_prepared_count,
353                    queue_consumer.id,
354                    e.as_str()
355                );
356                continue;
357            }
358
359            let size_batch = queue_consumer.get_batch_size();
360
361            let mut max_size_batch = size_batch;
362            if let Some(m) = self.max_batch_size {
363                max_size_batch = m;
364            }
365
366            if size_batch > 0 {
367                debug!("queue: batch size={}", size_batch);
368                if let Some(new_size) = before_batch(backend, module_context, size_batch) {
369                    max_size_batch = new_size;
370                }
371            }
372
373            let mut prepared_batch_size = 0;
374            for _it in 0..max_size_batch {
375                if !queue_consumer.pop_header() {
376                    break;
377                }
378
379                let mut raw = RawObj::new(vec![0; queue_consumer.record_len()]);
380
381                if let Err(e) = queue_consumer.pop_body(&mut raw.data) {
382                    match e {
383                        ErrorQueue::FailReadTailMessage => {
384                            break;
385                        }
386                        ErrorQueue::InvalidChecksum => {
387                            error!("[module] consumer:pop_body: invalid CRC, attempt seek next record");
388                            queue_consumer.seek_next_pos();
389                            break;
390                        }
391                        _ => {
392                            error!(
393                                "{} get msg from queue: {}",
394                                self.queue_prepared_count,
395                                e.as_str()
396                            );
397                            break;
398                        }
399                    }
400                }
401
402                let mut need_commit = true;
403
404                if let Some(&mut f) = prepare_raw {
405                    match f(backend, module_context, &raw, queue_consumer) {
406                        Err(e) => {
407                            if let PrepareError::Fatal = e {
408                                warn!("prepare: found fatal error, stop listen queue");
409                                return;
410                            }
411                        }
412                        Ok(b) => {
413                            need_commit = b;
414                        }
415                    }
416                }
417
418                if let Some(&mut f) = prepare_indv {
419                    let mut queue_element = Individual::new_raw(raw);
420                    if parse_raw(&mut queue_element).is_ok() {
421                        let mut is_processed = true;
422                        if let Some(assigned_subsystems) =
423                            queue_element.get_first_integer("assigned_subsystems")
424                        {
425                            if assigned_subsystems > 0 {
426                                if let Some(my_subsystem_id) = self.subsystem_id {
427                                    if assigned_subsystems & my_subsystem_id == 0 {
428                                        is_processed = false;
429                                    }
430                                } else {
431                                    is_processed = false;
432                                }
433                            }
434                        }
435
436                        if is_processed {
437                            match f(backend, module_context, &mut queue_element, queue_consumer) {
438                                Err(e) => {
439                                    if let PrepareError::Fatal = e {
440                                        warn!("prepare: found fatal error, stop listen queue");
441                                        return;
442                                    }
443                                }
444                                Ok(b) => {
445                                    need_commit = b;
446                                }
447                            }
448                        }
449                    }
450                }
451
452                if need_commit {
453                    queue_consumer.commit();
454                }
455
456                self.queue_prepared_count += 1;
457
458                if self.queue_prepared_count % 1000 == 0 {
459                    info!("get from queue, count: {}", self.queue_prepared_count);
460                }
461                prepared_batch_size += 1;
462            }
463
464            if size_batch > 0 {
465                match after_batch(backend, module_context, prepared_batch_size) {
466                    Ok(b) => {
467                        if b {
468                            queue_consumer.commit();
469                        }
470                    }
471                    Err(e) => {
472                        if let PrepareError::Fatal = e {
473                            warn!("after_batch: found fatal error, stop listen queue");
474                            return;
475                        }
476                    }
477                }
478            }
479
480            if prepared_batch_size == size_batch {
481                if let Some(s) = &soc {
482                    let wmsg = s.recv();
483                    if let Err(e) = wmsg {
484                        debug!("fail recv from queue notify channel, err={:?}", e);
485
486                        if count_timeout_error > 0 && size_batch > 0 {
487                            warn!("queue changed but we not received notify message, need reconnect...");
488                            self.is_ready_notify_channel = false;
489                            count_timeout_error += 1;
490                        }
491                    } else {
492                        count_timeout_error = 0;
493                    }
494                }
495            }
496
497            if let Some(t) = self.max_timeout_between_batches {
498                let delta = prev_batch_time.elapsed().as_millis() as u64;
499                if let Some(c) = self.min_batch_size_to_cancel_timeout {
500                    if prepared_batch_size < c && delta < t {
501                        thread::sleep(time::Duration::from_millis(t - delta));
502                        info!("sleep {} ms", t - delta);
503                    }
504                } else if delta < t {
505                    thread::sleep(time::Duration::from_millis(t - delta));
506                    info!("sleep {} ms", t - delta);
507                }
508            }
509
510            prev_batch_time = Instant::now();
511        }
512    }
513}
514
515pub fn get_inner_binobj_as_individual<'a>(
516    queue_element: &'a mut Individual,
517    field_name: &str,
518    new_indv: &'a mut Individual,
519) -> bool {
520    let binobj = queue_element.get_first_binobj(field_name);
521    if binobj.is_some() {
522        new_indv.set_raw(&binobj.unwrap_or_default());
523        if parse_raw(new_indv).is_ok() {
524            return true;
525        }
526    }
527    false
528}
529
530pub fn get_cmd(queue_element: &mut Individual) -> Option<IndvOp> {
531    let wcmd = queue_element.get_first_integer("cmd");
532    wcmd?;
533
534    Some(IndvOp::from_i64(wcmd.unwrap_or_default()))
535}
536
537pub fn init_log(module_name: &str) {
538    init_log_with_filter(module_name, None)
539}
540
541pub fn init_log_with_filter(module_name: &str, filter: Option<&str>) {
542    init_log_with_params(module_name, filter, false);
543}
544
545pub fn init_log_with_params(module_name: &str, filter: Option<&str>, with_thread_id: bool) {
546    let var_log_name = module_name.to_owned() + "_LOG";
547
548    let filters_str = match env::var(&var_log_name) {
549        Ok(val) if !val.is_empty() => {
550            println!("use env var: {}: {:?}", var_log_name, val);
551            val
552        }
553        _ => filter.unwrap_or("info").to_owned(),
554    };
555
556    if with_thread_id {
557        Builder::new()
558            .format(|buf, record| {
559                writeln!(
560                    buf,
561                    "{} {} [{}] - {}",
562                    thread_id::get(),
563                    Local::now().format("%Y-%m-%dT%H:%M:%S%.3f"),
564                    record.level(),
565                    record.args()
566                )
567            })
568            .parse_filters(&filters_str)
569            .try_init()
570            .unwrap_or(())
571    } else {
572        Builder::new()
573            .format(|buf, record| {
574                writeln!(
575                    buf,
576                    "{} [{}] - {}",
577                    Local::now().format("%Y-%m-%dT%H:%M:%S%.3f"),
578                    record.level(),
579                    record.args()
580                )
581            })
582            .parse_filters(&filters_str)
583            .try_init()
584            .unwrap_or(())
585    }
586}
587
588pub fn get_info_of_module(module_name: &str) -> Option<(i64, i64)> {
589    let module_info = ModuleInfo::new("./data", module_name, false);
590    if module_info.is_err() {
591        error!(
592            "fail open info of [{}], err={:?}",
593            module_name,
594            module_info.err()
595        );
596        return None;
597    }
598
599    let mut info = module_info.unwrap();
600    info.read_info()
601}