1use crate::buffer::Buffer;
2use crate::connect;
3use crate::error::ClientError;
4use crate::queue::RecoverySettings;
5use crate::worker::{Message, Worker};
6use crossbeam_channel::{bounded, unbounded, Sender};
7use serde::Serialize;
8use std::fmt::Debug;
9use std::io;
10use std::net::ToSocketAddrs;
11use std::sync::atomic::{AtomicBool, Ordering};
12use std::time::{Duration, SystemTime};
13
14pub trait Client {
15 fn send<A>(&self, tag: String, a: &A, timestamp: SystemTime) -> Result<(), ClientError>
16 where
17 A: Serialize;
18 fn terminate(&self) -> Result<(), ClientError>;
19}
20
21pub struct WorkerPool {
22 worker: Worker,
23 sender: Sender<Message>,
24 terminated: AtomicBool,
25}
26
27impl WorkerPool {
28 pub fn create<A>(addr: &A) -> io::Result<Self>
29 where
30 A: ToSocketAddrs + Clone + Debug + Send + 'static,
31 {
32 Self::with_settings(addr, &Default::default())
33 }
34 pub fn with_settings<A>(addr: &A, settings: &Settings) -> io::Result<Self>
35 where
36 A: ToSocketAddrs + Clone + Debug + Send + 'static,
37 {
38 let (sender, receiver) = unbounded();
39
40 info!("Worker creating...");
41
42 let conn_settings = connect::ConnectionSettings {
43 connect_retry_initial_delay: settings.connection_retry_initial_delay,
44 connect_retry_max_delay: settings.connection_retry_max_delay,
45 connect_retry_timeout: settings.connection_retry_timeout,
46 write_timeout: settings.write_timeout,
47 read_timeout: settings.read_timeout,
48 write_retry_initial_delay: settings.write_retry_initial_delay,
49 write_retry_max_delay: settings.write_retry_max_delay,
50 write_retry_timeout: settings.write_retry_timeout,
51 read_retry_initial_delay: settings.read_retry_initial_delay,
52 read_retry_max_delay: settings.read_retry_max_delay,
53 read_retry_timeout: settings.read_retry_timeout,
54 };
55 let worker = Worker::create(
56 addr.clone(),
57 conn_settings,
58 receiver,
59 settings.flush_period,
60 settings.max_flush_entries,
61 settings.recovery_settings,
62 )?;
63
64 Ok(Self {
65 worker,
66 sender,
67 terminated: AtomicBool::new(false),
68 })
69 }
70}
71
72impl Client for WorkerPool {
73 fn send<A>(&self, tag: String, a: &A, timestamp: SystemTime) -> Result<(), ClientError>
74 where
75 A: Serialize,
76 A: Buffer<A>,
77 {
78 if self.terminated.load(Ordering::Acquire) {
79 debug!("Worker does already closed.");
80 return Ok(());
81 }
82
83 let buf = a.pack().map_err(ClientError::Buffer)?;
84
85 self.sender
86 .send(Message::Queuing(tag, timestamp, buf))
87 .map_err(ClientError::SendChannel)?;
88 Ok(())
89 }
90
91 fn terminate(&self) -> Result<(), ClientError> {
92 if self.terminated.fetch_or(true, Ordering::SeqCst) {
93 info!("Worker does already terminated.");
94 return Ok(());
95 }
96
97 info!("Sending terminate message to worker.");
98
99 let (sender, receiver) = bounded::<()>(0);
100 self.sender.send(Message::Terminating(sender)).unwrap();
101 receiver.recv().map_err(ClientError::RecieveChannel)?;
102
103 Ok(())
104 }
105}
106
107impl Drop for WorkerPool {
108 fn drop(&mut self) {
109 self.terminate().unwrap();
110 let wkr = &mut self.worker;
111
112 info!("Shutting down worker.");
113
114 wkr.join_handler();
115 }
116}
117
118#[derive(Clone)]
119pub struct Settings {
120 pub flush_period: Duration,
121 pub max_flush_entries: usize,
122 pub connection_retry_initial_delay: Duration,
123 pub connection_retry_max_delay: Duration,
124 pub connection_retry_timeout: Duration,
125 pub write_timeout: Duration,
126 pub read_timeout: Duration,
127
128 pub write_retry_initial_delay: Duration,
129 pub write_retry_max_delay: Duration,
130 pub write_retry_timeout: Duration,
131 pub read_retry_initial_delay: Duration,
132 pub read_retry_max_delay: Duration,
133 pub read_retry_timeout: Duration,
134
135 #[deprecated(
136 since = "1.2.0",
137 note = "use `recovery_settings` instead, it has no effect"
138 )]
139 pub does_recover: bool,
140 pub recovery_settings: RecoverySettings,
141}
142
143impl Default for Settings {
144 fn default() -> Self {
145 Settings {
146 flush_period: Duration::from_millis(256),
147 max_flush_entries: 1024,
148 connection_retry_initial_delay: Duration::from_millis(50),
149 connection_retry_max_delay: Duration::from_secs(5),
150 connection_retry_timeout: Duration::from_secs(60),
151 write_retry_initial_delay: Duration::from_millis(5),
152 write_retry_max_delay: Duration::from_secs(5),
153 write_retry_timeout: Duration::from_secs(30),
154 read_retry_initial_delay: Duration::from_millis(5),
155 read_retry_max_delay: Duration::from_secs(5),
156 read_retry_timeout: Duration::from_secs(10),
157 write_timeout: Duration::from_secs(1),
158 read_timeout: Duration::from_secs(1),
159 #[allow(deprecated)]
160 does_recover: false,
161 recovery_settings: RecoverySettings::default(),
162 }
163 }
164}
165
166#[cfg(test)]
167mod test {}