poston/
client.rs

1use crate::buffer::Buffer;
2use crate::connect;
3use crate::error::ClientError;
4use crate::queue::RecoverySettings;
5use crate::worker::{Message, Worker};
6use crossbeam_channel::{bounded, unbounded, Sender};
7use serde::Serialize;
8use std::fmt::Debug;
9use std::io;
10use std::net::ToSocketAddrs;
11use std::sync::atomic::{AtomicBool, Ordering};
12use std::time::{Duration, SystemTime};
13
14pub trait Client {
15    fn send<A>(&self, tag: String, a: &A, timestamp: SystemTime) -> Result<(), ClientError>
16    where
17        A: Serialize;
18    fn terminate(&self) -> Result<(), ClientError>;
19}
20
21pub struct WorkerPool {
22    worker: Worker,
23    sender: Sender<Message>,
24    terminated: AtomicBool,
25}
26
27impl WorkerPool {
28    pub fn create<A>(addr: &A) -> io::Result<Self>
29    where
30        A: ToSocketAddrs + Clone + Debug + Send + 'static,
31    {
32        Self::with_settings(addr, &Default::default())
33    }
34    pub fn with_settings<A>(addr: &A, settings: &Settings) -> io::Result<Self>
35    where
36        A: ToSocketAddrs + Clone + Debug + Send + 'static,
37    {
38        let (sender, receiver) = unbounded();
39
40        info!("Worker creating...");
41
42        let conn_settings = connect::ConnectionSettings {
43            connect_retry_initial_delay: settings.connection_retry_initial_delay,
44            connect_retry_max_delay: settings.connection_retry_max_delay,
45            connect_retry_timeout: settings.connection_retry_timeout,
46            write_timeout: settings.write_timeout,
47            read_timeout: settings.read_timeout,
48            write_retry_initial_delay: settings.write_retry_initial_delay,
49            write_retry_max_delay: settings.write_retry_max_delay,
50            write_retry_timeout: settings.write_retry_timeout,
51            read_retry_initial_delay: settings.read_retry_initial_delay,
52            read_retry_max_delay: settings.read_retry_max_delay,
53            read_retry_timeout: settings.read_retry_timeout,
54        };
55        let worker = Worker::create(
56            addr.clone(),
57            conn_settings,
58            receiver,
59            settings.flush_period,
60            settings.max_flush_entries,
61            settings.recovery_settings,
62        )?;
63
64        Ok(Self {
65            worker,
66            sender,
67            terminated: AtomicBool::new(false),
68        })
69    }
70}
71
72impl Client for WorkerPool {
73    fn send<A>(&self, tag: String, a: &A, timestamp: SystemTime) -> Result<(), ClientError>
74    where
75        A: Serialize,
76        A: Buffer<A>,
77    {
78        if self.terminated.load(Ordering::Acquire) {
79            debug!("Worker does already closed.");
80            return Ok(());
81        }
82
83        let buf = a.pack().map_err(ClientError::Buffer)?;
84
85        self.sender
86            .send(Message::Queuing(tag, timestamp, buf))
87            .map_err(ClientError::SendChannel)?;
88        Ok(())
89    }
90
91    fn terminate(&self) -> Result<(), ClientError> {
92        if self.terminated.fetch_or(true, Ordering::SeqCst) {
93            info!("Worker does already terminated.");
94            return Ok(());
95        }
96
97        info!("Sending terminate message to worker.");
98
99        let (sender, receiver) = bounded::<()>(0);
100        self.sender.send(Message::Terminating(sender)).unwrap();
101        receiver.recv().map_err(ClientError::RecieveChannel)?;
102
103        Ok(())
104    }
105}
106
107impl Drop for WorkerPool {
108    fn drop(&mut self) {
109        self.terminate().unwrap();
110        let wkr = &mut self.worker;
111
112        info!("Shutting down worker.");
113
114        wkr.join_handler();
115    }
116}
117
118#[derive(Clone)]
119pub struct Settings {
120    pub flush_period: Duration,
121    pub max_flush_entries: usize,
122    pub connection_retry_initial_delay: Duration,
123    pub connection_retry_max_delay: Duration,
124    pub connection_retry_timeout: Duration,
125    pub write_timeout: Duration,
126    pub read_timeout: Duration,
127
128    pub write_retry_initial_delay: Duration,
129    pub write_retry_max_delay: Duration,
130    pub write_retry_timeout: Duration,
131    pub read_retry_initial_delay: Duration,
132    pub read_retry_max_delay: Duration,
133    pub read_retry_timeout: Duration,
134
135    #[deprecated(
136        since = "1.2.0",
137        note = "use `recovery_settings` instead, it has no effect"
138    )]
139    pub does_recover: bool,
140    pub recovery_settings: RecoverySettings,
141}
142
143impl Default for Settings {
144    fn default() -> Self {
145        Settings {
146            flush_period: Duration::from_millis(256),
147            max_flush_entries: 1024,
148            connection_retry_initial_delay: Duration::from_millis(50),
149            connection_retry_max_delay: Duration::from_secs(5),
150            connection_retry_timeout: Duration::from_secs(60),
151            write_retry_initial_delay: Duration::from_millis(5),
152            write_retry_max_delay: Duration::from_secs(5),
153            write_retry_timeout: Duration::from_secs(30),
154            read_retry_initial_delay: Duration::from_millis(5),
155            read_retry_max_delay: Duration::from_secs(5),
156            read_retry_timeout: Duration::from_secs(10),
157            write_timeout: Duration::from_secs(1),
158            read_timeout: Duration::from_secs(1),
159            #[allow(deprecated)]
160            does_recover: false,
161            recovery_settings: RecoverySettings::default(),
162        }
163    }
164}
165
166#[cfg(test)]
167mod test {}