v_common/module/
veda_module.rs1use crate::module::common::sys_sig_listener;
2use crate::module::module_impl::{init_log, Module, PrepareError};
3use v_individual_model::onto::individual::{Individual, RawObj};
4use v_individual_model::onto::parser::parse_raw;
5use crossbeam_channel::{select, tick};
6use nng::{Protocol, Socket};
7use std::time::{Duration, Instant};
8use std::{thread, time};
9use v_queue::consumer::Consumer;
10use v_queue::record::ErrorQueue;
11
12pub trait VedaQueueModule {
13 fn before_batch(&mut self, size_batch: u32) -> Option<u32>;
14 fn prepare(&mut self, queue_element: &mut Individual) -> Result<bool, PrepareError>;
15 fn after_batch(&mut self, prepared_batch_size: u32) -> Result<bool, PrepareError>;
16 fn heartbeat(&mut self) -> Result<(), PrepareError>;
17 fn before_start(&mut self);
18 fn before_exit(&mut self);
19}
20
21impl Module {
22 pub fn prepare_queue(&mut self, veda_module: &mut dyn VedaQueueModule) {
23 init_log(&self.name);
24
25 let queue_consumer = &mut Consumer::new("./data/queue", &self.name, "individuals-flow").expect("!!!!!!!!! FAIL QUEUE");
26
27 if let Ok(ch) = sys_sig_listener() {
28 self.syssig_ch = Some(ch);
29 }
30
31 let mut soc = Socket::new(Protocol::Sub0).unwrap();
32 let mut count_timeout_error = 0;
33
34 let mut prev_batch_time = Instant::now();
35 let update = tick(Duration::from_millis(1));
36 veda_module.before_start();
37 loop {
38 if let Some(qq) = &self.syssig_ch {
39 select! {
40 recv(update) -> _ => {
41 }
42 recv(qq) -> _ => {
43 info!("Exit");
44 veda_module.before_exit();
45 std::process::exit (exitcode::OK);
46 }
48 }
49 }
50
51 if let Err(PrepareError::Fatal) = veda_module.heartbeat() {
52 error!("heartbeat: found fatal error, stop listen queue");
53 break;
54 }
55
56 if let Some(s) = self.connect_to_notify_channel() {
57 soc = s;
58 }
59
60 if let Err(e) = queue_consumer.queue.get_info_of_part(queue_consumer.id, true) {
62 error!("{} get_info_of_part {}: {}", self.queue_prepared_count, queue_consumer.id, e.as_str());
63 continue;
64 }
65
66 let size_batch = queue_consumer.get_batch_size();
67
68 let mut max_size_batch = size_batch;
69 if let Some(m) = self.max_batch_size {
70 max_size_batch = m;
71 }
72
73 if size_batch > 0 {
74 debug!("queue: batch size={}", size_batch);
75 if let Some(new_size) = veda_module.before_batch(size_batch) {
76 max_size_batch = new_size;
77 }
78 }
79
80 let mut prepared_batch_size = 0;
81 for _it in 0..max_size_batch {
82 if !queue_consumer.pop_header() {
84 break;
85 }
86
87 let mut raw = RawObj::new(vec![0; (queue_consumer.header.msg_length) as usize]);
88
89 if let Err(e) = queue_consumer.pop_body(&mut raw.data) {
91 match e {
92 ErrorQueue::FailReadTailMessage => {
93 break;
94 },
95 ErrorQueue::InvalidChecksum => {
96 error!("[module] consumer:pop_body: invalid CRC, attempt seek next record");
97 queue_consumer.seek_next_pos();
98 break;
99 },
100 _ => {
101 error!("{} get msg from queue: {}", self.queue_prepared_count, e.as_str());
102 break;
103 },
104 }
105 }
106
107 let mut need_commit = true;
108 {
124 let mut queue_element = Individual::new_raw(raw);
125 if parse_raw(&mut queue_element).is_ok() {
126 let mut is_processed = true;
127 if let Some(assigned_subsystems) = queue_element.get_first_integer("assigned_subsystems") {
128 if assigned_subsystems > 0 {
129 if let Some(my_subsystem_id) = self.subsystem_id {
130 if assigned_subsystems & my_subsystem_id == 0 {
131 is_processed = false;
132 }
133 } else {
134 is_processed = false;
135 }
136 }
137 }
138
139 if is_processed {
140 match veda_module.prepare(&mut queue_element) {
141 Err(e) => {
142 if let PrepareError::Fatal = e {
143 warn!("prepare: found fatal error, stop listen queue");
144 return;
145 }
146 },
147 Ok(b) => {
148 need_commit = b;
149 },
150 }
151 }
152 }
153 }
154
155 if need_commit {
156 queue_consumer.commit();
157 }
158
159 self.queue_prepared_count += 1;
160
161 if self.queue_prepared_count % 1000 == 0 {
162 info!("get from queue, count: {}", self.queue_prepared_count);
163 }
164 prepared_batch_size += 1;
165 }
166
167 if size_batch > 0 {
168 match veda_module.after_batch(prepared_batch_size) {
169 Ok(b) => {
170 if b {
171 queue_consumer.commit();
172 }
173 },
174 Err(e) => {
175 if let PrepareError::Fatal = e {
176 warn!("after_batch: found fatal error, stop listen queue");
177 return;
178 }
179 },
180 }
181 }
182
183 if prepared_batch_size == size_batch {
184 let wmsg = soc.recv();
185 if let Err(e) = wmsg {
186 debug!("fail recv from queue notify channel, err={:?}", e);
187
188 if count_timeout_error > 0 && size_batch > 0 {
189 warn!("queue changed but we not received notify message, need reconnect...");
190 self.is_ready_notify_channel = false;
191 count_timeout_error += 1;
192 }
193 } else {
194 count_timeout_error = 0;
195 }
196 }
197
198 if let Some(t) = self.max_timeout_between_batches {
199 let delta = prev_batch_time.elapsed().as_millis() as u64;
200 if let Some(c) = self.min_batch_size_to_cancel_timeout {
201 if prepared_batch_size < c && delta < t {
202 thread::sleep(time::Duration::from_millis(t - delta));
203 info!("sleep {} ms", t - delta);
204 }
205 } else if delta < t {
206 thread::sleep(time::Duration::from_millis(t - delta));
207 info!("sleep {} ms", t - delta);
208 }
209 }
210
211 prev_batch_time = Instant::now();
212 }
213 }
214}