Skip to main content

v_module_queue/
veda_module.rs

1use crate::module::{init_log, Module, PrepareError};
2use crate::signals::sys_sig_listener;
3use crossbeam_channel::{select, tick};
4use exitcode;
5use nng::{Protocol, Socket};
6use std::time::{Duration, Instant};
7use std::{thread, time};
8use v_individual_model::onto::individual::{Individual, RawObj};
9use v_individual_model::onto::parser::parse_raw;
10use v_queue::consumer::Consumer;
11use v_queue::record::ErrorQueue;
12
13pub trait VedaQueueModule {
14    fn before_batch(&mut self, size_batch: u32) -> Option<u32>;
15    fn prepare(&mut self, queue_element: &mut Individual) -> Result<bool, PrepareError>;
16    fn after_batch(&mut self, prepared_batch_size: u32) -> Result<bool, PrepareError>;
17    fn heartbeat(&mut self) -> Result<(), PrepareError>;
18    fn before_start(&mut self);
19    fn before_exit(&mut self);
20}
21
22impl Module {
23    pub fn prepare_queue(&mut self, veda_module: &mut dyn VedaQueueModule) {
24        init_log(&self.name);
25
26        let queue_consumer =
27            &mut Consumer::new("./data/queue", &self.name, "individuals-flow")
28                .expect("!!!!!!!!! FAIL QUEUE");
29
30        if let Ok(ch) = sys_sig_listener() {
31            self.syssig_ch = Some(ch);
32        }
33
34        let mut soc = Socket::new(Protocol::Sub0).unwrap();
35        let mut count_timeout_error = 0;
36
37        let mut prev_batch_time = Instant::now();
38        let update = tick(Duration::from_millis(1));
39        veda_module.before_start();
40        loop {
41            if let Some(qq) = &self.syssig_ch {
42                select! {
43                    recv(update) -> _ => {
44                    }
45                    recv(qq) -> _ => {
46                        info!("Exit");
47                        veda_module.before_exit();
48                        std::process::exit(exitcode::OK);
49                    }
50                }
51            }
52
53            if let Err(PrepareError::Fatal) = veda_module.heartbeat() {
54                error!("heartbeat: found fatal error, stop listen queue");
55                break;
56            }
57
58            if let Some(s) = self.connect_to_notify_channel() {
59                soc = s;
60            }
61
62            if let Err(e) = queue_consumer.queue.get_info_of_part(queue_consumer.id, true) {
63                error!(
64                    "{} get_info_of_part {}: {}",
65                    self.queue_prepared_count,
66                    queue_consumer.id,
67                    e.as_str()
68                );
69                continue;
70            }
71
72            let size_batch = queue_consumer.get_batch_size();
73
74            let mut max_size_batch = size_batch;
75            if let Some(m) = self.max_batch_size {
76                max_size_batch = m;
77            }
78
79            if size_batch > 0 {
80                debug!("queue: batch size={}", size_batch);
81                if let Some(new_size) = veda_module.before_batch(size_batch) {
82                    max_size_batch = new_size;
83                }
84            }
85
86            let mut prepared_batch_size = 0;
87            for _it in 0..max_size_batch {
88                if !queue_consumer.pop_header() {
89                    break;
90                }
91
92                let mut raw = RawObj::new(vec![0; (queue_consumer.header.msg_length) as usize]);
93
94                if let Err(e) = queue_consumer.pop_body(&mut raw.data) {
95                    match e {
96                        ErrorQueue::FailReadTailMessage => break,
97                        ErrorQueue::InvalidChecksum => {
98                            error!("[module] consumer:pop_body: invalid CRC, attempt seek next record");
99                            queue_consumer.seek_next_pos();
100                            break;
101                        }
102                        _ => {
103                            error!(
104                                "{} get msg from queue: {}",
105                                self.queue_prepared_count,
106                                e.as_str()
107                            );
108                            break;
109                        }
110                    }
111                }
112
113                let mut need_commit = true;
114
115                let mut queue_element = Individual::new_raw(raw);
116                if parse_raw(&mut queue_element).is_ok() {
117                    let mut is_processed = true;
118                    if let Some(assigned_subsystems) =
119                        queue_element.get_first_integer("assigned_subsystems")
120                    {
121                        if assigned_subsystems > 0 {
122                            if let Some(my_subsystem_id) = self.subsystem_id {
123                                if assigned_subsystems & my_subsystem_id == 0 {
124                                    is_processed = false;
125                                }
126                            } else {
127                                is_processed = false;
128                            }
129                        }
130                    }
131
132                    if is_processed {
133                        match veda_module.prepare(&mut queue_element) {
134                            Err(e) => {
135                                if let PrepareError::Fatal = e {
136                                    warn!("prepare: found fatal error, stop listen queue");
137                                    return;
138                                }
139                            }
140                            Ok(b) => {
141                                need_commit = b;
142                            }
143                        }
144                    }
145                }
146
147                if need_commit {
148                    queue_consumer.commit();
149                }
150
151                self.queue_prepared_count += 1;
152
153                if self.queue_prepared_count % 1000 == 0 {
154                    info!("get from queue, count: {}", self.queue_prepared_count);
155                }
156                prepared_batch_size += 1;
157            }
158
159            if size_batch > 0 {
160                match veda_module.after_batch(prepared_batch_size) {
161                    Ok(b) => {
162                        if b {
163                            queue_consumer.commit();
164                        }
165                    }
166                    Err(e) => {
167                        if let PrepareError::Fatal = e {
168                            warn!("after_batch: found fatal error, stop listen queue");
169                            return;
170                        }
171                    }
172                }
173            }
174
175            if prepared_batch_size == size_batch {
176                let wmsg = soc.recv();
177                if let Err(e) = wmsg {
178                    debug!("fail recv from queue notify channel, err={:?}", e);
179
180                    if count_timeout_error > 0 && size_batch > 0 {
181                        warn!("queue changed but we not received notify message, need reconnect...");
182                        self.is_ready_notify_channel = false;
183                        count_timeout_error += 1;
184                    }
185                } else {
186                    count_timeout_error = 0;
187                }
188            }
189
190            if let Some(t) = self.max_timeout_between_batches {
191                let delta = prev_batch_time.elapsed().as_millis() as u64;
192                if let Some(c) = self.min_batch_size_to_cancel_timeout {
193                    if prepared_batch_size < c && delta < t {
194                        thread::sleep(time::Duration::from_millis(t - delta));
195                        info!("sleep {} ms", t - delta);
196                    }
197                } else if delta < t {
198                    thread::sleep(time::Duration::from_millis(t - delta));
199                    info!("sleep {} ms", t - delta);
200                }
201            }
202
203            prev_batch_time = Instant::now();
204        }
205    }
206}