doric/runtime/
endpoint.rs1use crate::backend::LoggerConfig;
2use crate::config;
3use crate::encode::parser::Message;
4use crate::runtime::append::AppendBuilder;
5use crate::runtime::append::Appender;
6use crossbeam_queue::SegQueue;
7use std::sync::Arc;
8use std::sync::Mutex;
9use std::{thread, time};
10
11pub struct Endpoint {
12 #[allow(dead_code)]
13 interval: u64,
14 rx: Arc<Mutex<SegQueue<Message>>>,
15 appender: Box<dyn Appender>,
16}
17
18pub struct EndpointBuilder {
19 interval: u64,
20}
21
22impl EndpointBuilder {
23 pub fn new() -> EndpointBuilder {
24 EndpointBuilder { interval: 10 }
25 }
26
27 pub fn build(&mut self, conf: &config::Config, rx: Arc<Mutex<SegQueue<Message>>>) {
28 let lc = LoggerConfig {
29 path: conf.path.clone(),
30 max_size: conf.max_size * 1024 * 1024,
31 max_segments: conf.max_segments,
32 log_type: conf.log_type.clone(),
33 };
34
35 let mut interval = self.interval;
36 if conf.interval != 0 {
37 interval = conf.interval;
38 }
39
40 let mut ep = Endpoint {
41 interval: interval,
42 rx: rx,
43 appender: AppendBuilder::build(lc),
44 };
45
46 ep.run()
47 }
48}
49
50impl Endpoint {
51 pub fn run(&mut self) {
52 let delay = time::Duration::from_millis(self.interval);
53 let mut cycle = 0;
54 loop {
55 if !self.rx.lock().unwrap().is_empty() {
56 let msg = self.rx.lock().unwrap().pop().unwrap();
57 match self.appender.append(&msg) {
58 Ok(_v) => {}
59 Err(e) => {
60 println!("error = {:?}", e);
61 }
62 };
63 } else {
64 cycle += 1;
65 }
66
67 if cycle == 10 {
68 thread::sleep(delay);
69 cycle = 0;
70 }
71 }
72 }
73}