Skip to main content

v_common/module/
module_impl.rs

1//! Queue consumer runtime — see `v-module-queue`.
2
3pub use v_module_queue::{
4    get_cmd, get_inner_binobj_as_individual, get_info_of_module, init_log,
5    init_log_with_filter, init_log_with_params, Module, PrepareError,
6};
7
8use crate::module::info::ModuleInfo;
9use std::{thread, time};
10
11pub fn wait_load_ontology() -> i64 {
12    wait_module("input-onto", 1)
13}
14
15pub fn wait_module(module_name: &str, wait_op_id: i64) -> i64 {
16    if wait_op_id < 0 {
17        error!(
18            "wait module [{}] to complete op_id={}",
19            module_name, wait_op_id
20        );
21        return -1;
22    }
23
24    info!(
25        "wait module [{}] to complete op_id={}",
26        module_name, wait_op_id
27    );
28    loop {
29        let module_info = ModuleInfo::new("./data", module_name, false);
30        if module_info.is_err() {
31            error!(
32                "fail open info of [{}], err={:?}",
33                module_name,
34                module_info.err()
35            );
36            thread::sleep(time::Duration::from_millis(300));
37            continue;
38        }
39
40        let mut info = module_info.unwrap();
41        loop {
42            if let Some((_, committed)) = info.read_info() {
43                if committed >= wait_op_id {
44                    info!(
45                        "wait module [{}] to complete op_id={}, found commited_op_id={}",
46                        module_name, wait_op_id, committed
47                    );
48                    return committed;
49                }
50            } else {
51                error!("fail read info for module [{}]", module_name);
52            }
53            thread::sleep(time::Duration::from_millis(300));
54        }
55    }
56}