doric/runtime/
endpoint.rs

1use crate::backend::LoggerConfig;
2use crate::config;
3use crate::encode::parser::Message;
4use crate::runtime::append::AppendBuilder;
5use crate::runtime::append::Appender;
6use crossbeam_queue::SegQueue;
7use std::sync::Arc;
8use std::sync::Mutex;
9use std::{thread, time};
10
11pub struct Endpoint {
12    #[allow(dead_code)]
13    interval: u64,
14    rx: Arc<Mutex<SegQueue<Message>>>,
15    appender: Box<dyn Appender>,
16}
17
18pub struct EndpointBuilder {
19    interval: u64,
20}
21
22impl EndpointBuilder {
23    pub fn new() -> EndpointBuilder {
24        EndpointBuilder { interval: 10 }
25    }
26
27    pub fn build(&mut self, conf: &config::Config, rx: Arc<Mutex<SegQueue<Message>>>) {
28        let lc = LoggerConfig {
29            path: conf.path.clone(),
30            max_size: conf.max_size * 1024 * 1024,
31            max_segments: conf.max_segments,
32            log_type: conf.log_type.clone(),
33        };
34
35        let mut interval = self.interval;
36        if conf.interval != 0 {
37            interval = conf.interval;
38        }
39
40        let mut ep = Endpoint {
41            interval: interval,
42            rx: rx,
43            appender: AppendBuilder::build(lc),
44        };
45
46        ep.run()
47    }
48}
49
50impl Endpoint {
51    pub fn run(&mut self) {
52        let delay = time::Duration::from_millis(self.interval);
53        let mut cycle = 0;
54        loop {
55            if !self.rx.lock().unwrap().is_empty() {
56                let msg = self.rx.lock().unwrap().pop().unwrap();
57                match self.appender.append(&msg) {
58                    Ok(_v) => {}
59                    Err(e) => {
60                        println!("error = {:?}", e);
61                    }
62                };
63            } else {
64                cycle += 1;
65            }
66
67            if cycle == 10 {
68                thread::sleep(delay);
69                cycle = 0;
70            }
71        }
72    }
73}