v_common_module/
veda_module.rs

1use crate::common::sys_sig_listener;
2use crate::module::{init_log, Module, PrepareError};
3use crate::v_onto::individual::*;
4use crate::v_onto::parser::parse_raw;
5use crossbeam_channel::{select, tick};
6use nng::{Protocol, Socket};
7use std::time::{Duration, Instant};
8use std::{thread, time};
9use v_queue::consumer::Consumer;
10use v_queue::record::ErrorQueue;
11
12pub trait VedaQueueModule {
13    fn before_batch(&mut self, size_batch: u32) -> Option<u32>;
14    fn prepare(&mut self, queue_element: &mut Individual) -> Result<bool, PrepareError>;
15    fn after_batch(&mut self, prepared_batch_size: u32) -> Result<bool, PrepareError>;
16    fn heartbeat(&mut self) -> Result<(), PrepareError>;
17    fn before_start(&mut self);
18    fn before_exit(&mut self);
19}
20
21impl Module {
22    pub fn prepare_queue(&mut self, veda_module: &mut dyn VedaQueueModule) {
23        init_log(&self.name);
24
25        let queue_consumer = &mut Consumer::new("./data/queue", &self.name, "individuals-flow").expect("!!!!!!!!! FAIL QUEUE");
26
27        if let Ok(ch) = sys_sig_listener() {
28            self.syssig_ch = Some(ch);
29        }
30
31        let mut soc = Socket::new(Protocol::Sub0).unwrap();
32        let mut count_timeout_error = 0;
33
34        let mut prev_batch_time = Instant::now();
35        let update = tick(Duration::from_millis(1));
36        veda_module.before_start();
37        loop {
38            if let Some(qq) = &self.syssig_ch {
39                select! {
40                    recv(update) -> _ => {
41                    }
42                    recv(qq) -> _ => {
43                        info!("Exit");
44                        veda_module.before_exit();
45                        std::process::exit (exitcode::OK);
46                        //break;
47                    }
48                }
49            }
50
51            match veda_module.heartbeat() {
52                Err(e) => {
53                    if let PrepareError::Fatal = e {
54                        error!("heartbeat: found fatal error, stop listen queue");
55                        break;
56                    }
57                }
58                _ => {}
59            }
60
61            if let Some(s) = self.connect_to_notify_channel() {
62                soc = s;
63            }
64
65            // read queue current part info
66            if let Err(e) = queue_consumer.queue.get_info_of_part(queue_consumer.id, true) {
67                error!("{} get_info_of_part {}: {}", self.queue_prepared_count, queue_consumer.id, e.as_str());
68                continue;
69            }
70
71            let size_batch = queue_consumer.get_batch_size();
72
73            let mut max_size_batch = size_batch;
74            if let Some(m) = self.max_batch_size {
75                max_size_batch = m;
76            }
77
78            if size_batch > 0 {
79                debug!("queue: batch size={}", size_batch);
80                if let Some(new_size) = veda_module.before_batch(size_batch) {
81                    max_size_batch = new_size;
82                }
83            }
84
85            let mut prepared_batch_size = 0;
86            for _it in 0..max_size_batch {
87                // пробуем взять из очереди заголовок сообщения
88                if !queue_consumer.pop_header() {
89                    break;
90                }
91
92                let mut raw = RawObj::new(vec![0; (queue_consumer.header.msg_length) as usize]);
93
94                // заголовок взят успешно, занесем содержимое сообщения в структуру Individual
95                if let Err(e) = queue_consumer.pop_body(&mut raw.data) {
96                    match e {
97                        ErrorQueue::FailReadTailMessage => {
98                            break;
99                        }
100                        ErrorQueue::InvalidChecksum => {
101                            error!("[module] consumer:pop_body: invalid CRC, attempt seek next record");
102                            queue_consumer.seek_next_pos();
103                            break;
104                        }
105                        _ => {
106                            error!("{} get msg from queue: {}", self.queue_prepared_count, e.as_str());
107                            break;
108                        }
109                    }
110                }
111
112                let mut need_commit = true;
113                /*
114                                if let Some(&mut f) = prepare_raw {
115                                    match f(self, module, &raw, queue_consumer) {
116                                        Err(e) => {
117                                            if let PrepareError::Fatal = e {
118                                                warn!("prepare: found fatal error, stop listen queue");
119                                                return;
120                                            }
121                                        }
122                                        Ok(b) => {
123                                            need_commit = b;
124                                        }
125                                    }
126                                }
127                */
128                {
129                    let mut queue_element = Individual::new_raw(raw);
130                    if parse_raw(&mut queue_element).is_ok() {
131                        let mut is_processed = true;
132                        if let Some(assigned_subsystems) = queue_element.get_first_integer("assigned_subsystems") {
133                            if assigned_subsystems > 0 {
134                                if let Some(my_subsystem_id) = self.subsystem_id {
135                                    if assigned_subsystems & my_subsystem_id == 0 {
136                                        is_processed = false;
137                                    }
138                                } else {
139                                    is_processed = false;
140                                }
141                            }
142                        }
143
144                        if is_processed {
145                            match veda_module.prepare(&mut queue_element) {
146                                Err(e) => {
147                                    if let PrepareError::Fatal = e {
148                                        warn!("prepare: found fatal error, stop listen queue");
149                                        return;
150                                    }
151                                }
152                                Ok(b) => {
153                                    need_commit = b;
154                                }
155                            }
156                        }
157                    }
158                }
159
160                queue_consumer.next(need_commit);
161
162                self.queue_prepared_count += 1;
163
164                if self.queue_prepared_count % 1000 == 0 {
165                    info!("get from queue, count: {}", self.queue_prepared_count);
166                }
167                prepared_batch_size += 1;
168            }
169
170            if size_batch > 0 {
171                match veda_module.after_batch(prepared_batch_size) {
172                    Ok(b) => {
173                        if b {
174                            queue_consumer.commit();
175                        }
176                    }
177                    Err(e) => {
178                        if let PrepareError::Fatal = e {
179                            warn!("after_batch: found fatal error, stop listen queue");
180                            return;
181                        }
182                    }
183                }
184            }
185
186            if prepared_batch_size == size_batch {
187                let wmsg = soc.recv();
188                if let Err(e) = wmsg {
189                    debug!("fail recv from queue notify channel, err={:?}", e);
190
191                    if count_timeout_error > 0 && size_batch > 0 {
192                        warn!("queue changed but we not received notify message, need reconnect...");
193                        self.is_ready_notify_channel = false;
194                        count_timeout_error += 1;
195                    }
196                } else {
197                    count_timeout_error = 0;
198                }
199            }
200
201            if let Some(t) = self.max_timeout_between_batches {
202                let delta = prev_batch_time.elapsed().as_millis() as u64;
203                if let Some(c) = self.min_batch_size_to_cancel_timeout {
204                    if prepared_batch_size < c && delta < t {
205                        thread::sleep(time::Duration::from_millis(t - delta));
206                        info!("sleep {} ms", t - delta);
207                    }
208                } else if delta < t {
209                    thread::sleep(time::Duration::from_millis(t - delta));
210                    info!("sleep {} ms", t - delta);
211                }
212            }
213
214            prev_batch_time = Instant::now();
215        }
216    }
217}