v_module_queue/
veda_module.rs1use crate::module::{init_log, Module, PrepareError};
2use crate::signals::sys_sig_listener;
3use crossbeam_channel::{select, tick};
4use exitcode;
5use nng::{Protocol, Socket};
6use std::time::{Duration, Instant};
7use std::{thread, time};
8use v_individual_model::onto::individual::{Individual, RawObj};
9use v_individual_model::onto::parser::parse_raw;
10use v_queue::consumer::Consumer;
11use v_queue::record::ErrorQueue;
12
13pub trait VedaQueueModule {
14 fn before_batch(&mut self, size_batch: u32) -> Option<u32>;
15 fn prepare(&mut self, queue_element: &mut Individual) -> Result<bool, PrepareError>;
16 fn after_batch(&mut self, prepared_batch_size: u32) -> Result<bool, PrepareError>;
17 fn heartbeat(&mut self) -> Result<(), PrepareError>;
18 fn before_start(&mut self);
19 fn before_exit(&mut self);
20}
21
22impl Module {
23 pub fn prepare_queue(&mut self, veda_module: &mut dyn VedaQueueModule) {
24 init_log(&self.name);
25
26 let queue_consumer =
27 &mut Consumer::new("./data/queue", &self.name, "individuals-flow")
28 .expect("!!!!!!!!! FAIL QUEUE");
29
30 if let Ok(ch) = sys_sig_listener() {
31 self.syssig_ch = Some(ch);
32 }
33
34 let mut soc = Socket::new(Protocol::Sub0).unwrap();
35 let mut count_timeout_error = 0;
36
37 let mut prev_batch_time = Instant::now();
38 let update = tick(Duration::from_millis(1));
39 veda_module.before_start();
40 loop {
41 if let Some(qq) = &self.syssig_ch {
42 select! {
43 recv(update) -> _ => {
44 }
45 recv(qq) -> _ => {
46 info!("Exit");
47 veda_module.before_exit();
48 std::process::exit(exitcode::OK);
49 }
50 }
51 }
52
53 if let Err(PrepareError::Fatal) = veda_module.heartbeat() {
54 error!("heartbeat: found fatal error, stop listen queue");
55 break;
56 }
57
58 if let Some(s) = self.connect_to_notify_channel() {
59 soc = s;
60 }
61
62 if let Err(e) = queue_consumer.queue.get_info_of_part(queue_consumer.id, true) {
63 error!(
64 "{} get_info_of_part {}: {}",
65 self.queue_prepared_count,
66 queue_consumer.id,
67 e.as_str()
68 );
69 continue;
70 }
71
72 let size_batch = queue_consumer.get_batch_size();
73
74 let mut max_size_batch = size_batch;
75 if let Some(m) = self.max_batch_size {
76 max_size_batch = m;
77 }
78
79 if size_batch > 0 {
80 debug!("queue: batch size={}", size_batch);
81 if let Some(new_size) = veda_module.before_batch(size_batch) {
82 max_size_batch = new_size;
83 }
84 }
85
86 let mut prepared_batch_size = 0;
87 for _it in 0..max_size_batch {
88 if !queue_consumer.pop_header() {
89 break;
90 }
91
92 let mut raw = RawObj::new(vec![0; queue_consumer.record_len()]);
93
94 if let Err(e) = queue_consumer.pop_body(&mut raw.data) {
95 match e {
96 ErrorQueue::FailReadTailMessage => break,
97 ErrorQueue::InvalidChecksum => {
98 error!("[module] consumer:pop_body: invalid CRC, attempt seek next record");
99 queue_consumer.seek_next_pos();
100 break;
101 }
102 _ => {
103 error!(
104 "{} get msg from queue: {}",
105 self.queue_prepared_count,
106 e.as_str()
107 );
108 break;
109 }
110 }
111 }
112
113 let mut need_commit = true;
114
115 let mut queue_element = Individual::new_raw(raw);
116 if parse_raw(&mut queue_element).is_ok() {
117 let mut is_processed = true;
118 if let Some(assigned_subsystems) =
119 queue_element.get_first_integer("assigned_subsystems")
120 {
121 if assigned_subsystems > 0 {
122 if let Some(my_subsystem_id) = self.subsystem_id {
123 if assigned_subsystems & my_subsystem_id == 0 {
124 is_processed = false;
125 }
126 } else {
127 is_processed = false;
128 }
129 }
130 }
131
132 if is_processed {
133 match veda_module.prepare(&mut queue_element) {
134 Err(e) => {
135 if let PrepareError::Fatal = e {
136 warn!("prepare: found fatal error, stop listen queue");
137 return;
138 }
139 }
140 Ok(b) => {
141 need_commit = b;
142 }
143 }
144 }
145 }
146
147 if need_commit {
148 queue_consumer.commit();
149 }
150
151 self.queue_prepared_count += 1;
152
153 if self.queue_prepared_count % 1000 == 0 {
154 info!("get from queue, count: {}", self.queue_prepared_count);
155 }
156 prepared_batch_size += 1;
157 }
158
159 if size_batch > 0 {
160 match veda_module.after_batch(prepared_batch_size) {
161 Ok(b) => {
162 if b {
163 queue_consumer.commit();
164 }
165 }
166 Err(e) => {
167 if let PrepareError::Fatal = e {
168 warn!("after_batch: found fatal error, stop listen queue");
169 return;
170 }
171 }
172 }
173 }
174
175 if prepared_batch_size == size_batch {
176 let wmsg = soc.recv();
177 if let Err(e) = wmsg {
178 debug!("fail recv from queue notify channel, err={:?}", e);
179
180 if count_timeout_error > 0 && size_batch > 0 {
181 warn!("queue changed but we not received notify message, need reconnect...");
182 self.is_ready_notify_channel = false;
183 count_timeout_error += 1;
184 }
185 } else {
186 count_timeout_error = 0;
187 }
188 }
189
190 if let Some(t) = self.max_timeout_between_batches {
191 let delta = prev_batch_time.elapsed().as_millis() as u64;
192 if let Some(c) = self.min_batch_size_to_cancel_timeout {
193 if prepared_batch_size < c && delta < t {
194 thread::sleep(time::Duration::from_millis(t - delta));
195 info!("sleep {} ms", t - delta);
196 }
197 } else if delta < t {
198 thread::sleep(time::Duration::from_millis(t - delta));
199 info!("sleep {} ms", t - delta);
200 }
201 }
202
203 prev_batch_time = Instant::now();
204 }
205 }
206}