v_common_module/
veda_module.rs1use crate::common::sys_sig_listener;
2use crate::module::{init_log, Module, PrepareError};
3use crate::v_onto::individual::*;
4use crate::v_onto::parser::parse_raw;
5use crossbeam_channel::{select, tick};
6use nng::{Protocol, Socket};
7use std::time::{Duration, Instant};
8use std::{thread, time};
9use v_queue::consumer::Consumer;
10use v_queue::record::ErrorQueue;
11
12pub trait VedaQueueModule {
13 fn before_batch(&mut self, size_batch: u32) -> Option<u32>;
14 fn prepare(&mut self, queue_element: &mut Individual) -> Result<bool, PrepareError>;
15 fn after_batch(&mut self, prepared_batch_size: u32) -> Result<bool, PrepareError>;
16 fn heartbeat(&mut self) -> Result<(), PrepareError>;
17 fn before_start(&mut self);
18 fn before_exit(&mut self);
19}
20
21impl Module {
22 pub fn prepare_queue(&mut self, veda_module: &mut dyn VedaQueueModule) {
23 init_log(&self.name);
24
25 let queue_consumer = &mut Consumer::new("./data/queue", &self.name, "individuals-flow").expect("!!!!!!!!! FAIL QUEUE");
26
27 if let Ok(ch) = sys_sig_listener() {
28 self.syssig_ch = Some(ch);
29 }
30
31 let mut soc = Socket::new(Protocol::Sub0).unwrap();
32 let mut count_timeout_error = 0;
33
34 let mut prev_batch_time = Instant::now();
35 let update = tick(Duration::from_millis(1));
36 veda_module.before_start();
37 loop {
38 if let Some(qq) = &self.syssig_ch {
39 select! {
40 recv(update) -> _ => {
41 }
42 recv(qq) -> _ => {
43 info!("Exit");
44 veda_module.before_exit();
45 std::process::exit (exitcode::OK);
46 }
48 }
49 }
50
51 match veda_module.heartbeat() {
52 Err(e) => {
53 if let PrepareError::Fatal = e {
54 error!("heartbeat: found fatal error, stop listen queue");
55 break;
56 }
57 }
58 _ => {}
59 }
60
61 if let Some(s) = self.connect_to_notify_channel() {
62 soc = s;
63 }
64
65 if let Err(e) = queue_consumer.queue.get_info_of_part(queue_consumer.id, true) {
67 error!("{} get_info_of_part {}: {}", self.queue_prepared_count, queue_consumer.id, e.as_str());
68 continue;
69 }
70
71 let size_batch = queue_consumer.get_batch_size();
72
73 let mut max_size_batch = size_batch;
74 if let Some(m) = self.max_batch_size {
75 max_size_batch = m;
76 }
77
78 if size_batch > 0 {
79 debug!("queue: batch size={}", size_batch);
80 if let Some(new_size) = veda_module.before_batch(size_batch) {
81 max_size_batch = new_size;
82 }
83 }
84
85 let mut prepared_batch_size = 0;
86 for _it in 0..max_size_batch {
87 if !queue_consumer.pop_header() {
89 break;
90 }
91
92 let mut raw = RawObj::new(vec![0; (queue_consumer.header.msg_length) as usize]);
93
94 if let Err(e) = queue_consumer.pop_body(&mut raw.data) {
96 match e {
97 ErrorQueue::FailReadTailMessage => {
98 break;
99 }
100 ErrorQueue::InvalidChecksum => {
101 error!("[module] consumer:pop_body: invalid CRC, attempt seek next record");
102 queue_consumer.seek_next_pos();
103 break;
104 }
105 _ => {
106 error!("{} get msg from queue: {}", self.queue_prepared_count, e.as_str());
107 break;
108 }
109 }
110 }
111
112 let mut need_commit = true;
113 {
129 let mut queue_element = Individual::new_raw(raw);
130 if parse_raw(&mut queue_element).is_ok() {
131 let mut is_processed = true;
132 if let Some(assigned_subsystems) = queue_element.get_first_integer("assigned_subsystems") {
133 if assigned_subsystems > 0 {
134 if let Some(my_subsystem_id) = self.subsystem_id {
135 if assigned_subsystems & my_subsystem_id == 0 {
136 is_processed = false;
137 }
138 } else {
139 is_processed = false;
140 }
141 }
142 }
143
144 if is_processed {
145 match veda_module.prepare(&mut queue_element) {
146 Err(e) => {
147 if let PrepareError::Fatal = e {
148 warn!("prepare: found fatal error, stop listen queue");
149 return;
150 }
151 }
152 Ok(b) => {
153 need_commit = b;
154 }
155 }
156 }
157 }
158 }
159
160 queue_consumer.next(need_commit);
161
162 self.queue_prepared_count += 1;
163
164 if self.queue_prepared_count % 1000 == 0 {
165 info!("get from queue, count: {}", self.queue_prepared_count);
166 }
167 prepared_batch_size += 1;
168 }
169
170 if size_batch > 0 {
171 match veda_module.after_batch(prepared_batch_size) {
172 Ok(b) => {
173 if b {
174 queue_consumer.commit();
175 }
176 }
177 Err(e) => {
178 if let PrepareError::Fatal = e {
179 warn!("after_batch: found fatal error, stop listen queue");
180 return;
181 }
182 }
183 }
184 }
185
186 if prepared_batch_size == size_batch {
187 let wmsg = soc.recv();
188 if let Err(e) = wmsg {
189 debug!("fail recv from queue notify channel, err={:?}", e);
190
191 if count_timeout_error > 0 && size_batch > 0 {
192 warn!("queue changed but we not received notify message, need reconnect...");
193 self.is_ready_notify_channel = false;
194 count_timeout_error += 1;
195 }
196 } else {
197 count_timeout_error = 0;
198 }
199 }
200
201 if let Some(t) = self.max_timeout_between_batches {
202 let delta = prev_batch_time.elapsed().as_millis() as u64;
203 if let Some(c) = self.min_batch_size_to_cancel_timeout {
204 if prepared_batch_size < c && delta < t {
205 thread::sleep(time::Duration::from_millis(t - delta));
206 info!("sleep {} ms", t - delta);
207 }
208 } else if delta < t {
209 thread::sleep(time::Duration::from_millis(t - delta));
210 info!("sleep {} ms", t - delta);
211 }
212 }
213
214 prev_batch_time = Instant::now();
215 }
216 }
217}