Skip to main content

v_common/module/
veda_module.rs

1use crate::module::common::sys_sig_listener;
2use crate::module::module_impl::{init_log, Module, PrepareError};
3use v_individual_model::onto::individual::{Individual, RawObj};
4use v_individual_model::onto::parser::parse_raw;
5use crossbeam_channel::{select, tick};
6use nng::{Protocol, Socket};
7use std::time::{Duration, Instant};
8use std::{thread, time};
9use v_queue::consumer::Consumer;
10use v_queue::record::ErrorQueue;
11
12pub trait VedaQueueModule {
13    fn before_batch(&mut self, size_batch: u32) -> Option<u32>;
14    fn prepare(&mut self, queue_element: &mut Individual) -> Result<bool, PrepareError>;
15    fn after_batch(&mut self, prepared_batch_size: u32) -> Result<bool, PrepareError>;
16    fn heartbeat(&mut self) -> Result<(), PrepareError>;
17    fn before_start(&mut self);
18    fn before_exit(&mut self);
19}
20
21impl Module {
22    pub fn prepare_queue(&mut self, veda_module: &mut dyn VedaQueueModule) {
23        init_log(&self.name);
24
25        let queue_consumer = &mut Consumer::new("./data/queue", &self.name, "individuals-flow").expect("!!!!!!!!! FAIL QUEUE");
26
27        if let Ok(ch) = sys_sig_listener() {
28            self.syssig_ch = Some(ch);
29        }
30
31        let mut soc = Socket::new(Protocol::Sub0).unwrap();
32        let mut count_timeout_error = 0;
33
34        let mut prev_batch_time = Instant::now();
35        let update = tick(Duration::from_millis(1));
36        veda_module.before_start();
37        loop {
38            if let Some(qq) = &self.syssig_ch {
39                select! {
40                    recv(update) -> _ => {
41                    }
42                    recv(qq) -> _ => {
43                        info!("Exit");
44                        veda_module.before_exit();
45                        std::process::exit (exitcode::OK);
46                        //break;
47                    }
48                }
49            }
50
51            if let Err(PrepareError::Fatal) = veda_module.heartbeat() {
52                error!("heartbeat: found fatal error, stop listen queue");
53                break;
54            }
55
56            if let Some(s) = self.connect_to_notify_channel() {
57                soc = s;
58            }
59
60            // read queue current part info
61            if let Err(e) = queue_consumer.queue.get_info_of_part(queue_consumer.id, true) {
62                error!("{} get_info_of_part {}: {}", self.queue_prepared_count, queue_consumer.id, e.as_str());
63                continue;
64            }
65
66            let size_batch = queue_consumer.get_batch_size();
67
68            let mut max_size_batch = size_batch;
69            if let Some(m) = self.max_batch_size {
70                max_size_batch = m;
71            }
72
73            if size_batch > 0 {
74                debug!("queue: batch size={}", size_batch);
75                if let Some(new_size) = veda_module.before_batch(size_batch) {
76                    max_size_batch = new_size;
77                }
78            }
79
80            let mut prepared_batch_size = 0;
81            for _it in 0..max_size_batch {
82                // пробуем взять из очереди заголовок сообщения
83                if !queue_consumer.pop_header() {
84                    break;
85                }
86
87                let mut raw = RawObj::new(vec![0; (queue_consumer.header.msg_length) as usize]);
88
89                // заголовок взят успешно, занесем содержимое сообщения в структуру Individual
90                if let Err(e) = queue_consumer.pop_body(&mut raw.data) {
91                    match e {
92                        ErrorQueue::FailReadTailMessage => {
93                            break;
94                        },
95                        ErrorQueue::InvalidChecksum => {
96                            error!("[module] consumer:pop_body: invalid CRC, attempt seek next record");
97                            queue_consumer.seek_next_pos();
98                            break;
99                        },
100                        _ => {
101                            error!("{} get msg from queue: {}", self.queue_prepared_count, e.as_str());
102                            break;
103                        },
104                    }
105                }
106
107                let mut need_commit = true;
108                /*
109                                if let Some(&mut f) = prepare_raw {
110                                    match f(self, module, &raw, queue_consumer) {
111                                        Err(e) => {
112                                            if let PrepareError::Fatal = e {
113                                                warn!("prepare: found fatal error, stop listen queue");
114                                                return;
115                                            }
116                                        }
117                                        Ok(b) => {
118                                            need_commit = b;
119                                        }
120                                    }
121                                }
122                */
123                {
124                    let mut queue_element = Individual::new_raw(raw);
125                    if parse_raw(&mut queue_element).is_ok() {
126                        let mut is_processed = true;
127                        if let Some(assigned_subsystems) = queue_element.get_first_integer("assigned_subsystems") {
128                            if assigned_subsystems > 0 {
129                                if let Some(my_subsystem_id) = self.subsystem_id {
130                                    if assigned_subsystems & my_subsystem_id == 0 {
131                                        is_processed = false;
132                                    }
133                                } else {
134                                    is_processed = false;
135                                }
136                            }
137                        }
138
139                        if is_processed {
140                            match veda_module.prepare(&mut queue_element) {
141                                Err(e) => {
142                                    if let PrepareError::Fatal = e {
143                                        warn!("prepare: found fatal error, stop listen queue");
144                                        return;
145                                    }
146                                },
147                                Ok(b) => {
148                                    need_commit = b;
149                                },
150                            }
151                        }
152                    }
153                }
154
155                if need_commit {
156                    queue_consumer.commit();
157                }
158
159                self.queue_prepared_count += 1;
160
161                if self.queue_prepared_count % 1000 == 0 {
162                    info!("get from queue, count: {}", self.queue_prepared_count);
163                }
164                prepared_batch_size += 1;
165            }
166
167            if size_batch > 0 {
168                match veda_module.after_batch(prepared_batch_size) {
169                    Ok(b) => {
170                        if b {
171                            queue_consumer.commit();
172                        }
173                    },
174                    Err(e) => {
175                        if let PrepareError::Fatal = e {
176                            warn!("after_batch: found fatal error, stop listen queue");
177                            return;
178                        }
179                    },
180                }
181            }
182
183            if prepared_batch_size == size_batch {
184                let wmsg = soc.recv();
185                if let Err(e) = wmsg {
186                    debug!("fail recv from queue notify channel, err={:?}", e);
187
188                    if count_timeout_error > 0 && size_batch > 0 {
189                        warn!("queue changed but we not received notify message, need reconnect...");
190                        self.is_ready_notify_channel = false;
191                        count_timeout_error += 1;
192                    }
193                } else {
194                    count_timeout_error = 0;
195                }
196            }
197
198            if let Some(t) = self.max_timeout_between_batches {
199                let delta = prev_batch_time.elapsed().as_millis() as u64;
200                if let Some(c) = self.min_batch_size_to_cancel_timeout {
201                    if prepared_batch_size < c && delta < t {
202                        thread::sleep(time::Duration::from_millis(t - delta));
203                        info!("sleep {} ms", t - delta);
204                    }
205                } else if delta < t {
206                    thread::sleep(time::Duration::from_millis(t - delta));
207                    info!("sleep {} ms", t - delta);
208                }
209            }
210
211            prev_batch_time = Instant::now();
212        }
213    }
214}