tokio_fluent/
client.rs

1//! Fluentd client
2//!
3//! ## Example
4//!
5//! ```
6//! use tokio_fluent::{Client, Config, FluentClient};
7//! use tokio_fluent::record::{Map, Value};
8//!
9//! #[tokio::main]
10//! async fn main() {
11//!     let client = Client::new_tcp(
12//!         "127.0.0.1:24224".parse().unwrap(),
13//!         &Config{..Default::default()},
14//!     )
15//!     .await
16//!     .unwrap();
17//!
18//!     let mut map = Map::new();
19//!     map.insert("age".to_string(), 10.into());
20//!     client.send("fluent.test", map).unwrap();
21//! }
22//! ```
23
24use std::net::SocketAddr;
25use std::path::Path;
26use std::sync::Arc;
27use std::time::Duration;
28
29use anyhow::Result as AnyhowResult;
30use base64::{engine::general_purpose, Engine};
31use tokio::sync::broadcast::{channel, Sender};
32use uuid::Uuid;
33
34use crate::record::Map;
35use crate::worker::{
36    Message, Options, Record, RetryConfig, TCPConnectionConfig, UnixSocketConfig, Worker,
37};
38
39#[derive(Debug, Clone)]
40pub struct SendError {
41    source: String,
42}
43
44impl std::error::Error for SendError {}
45
46impl std::fmt::Display for SendError {
47    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48        write!(f, "{}", self.source)
49    }
50}
51
52#[derive(Debug, Clone)]
53/// Config for a client.
54pub struct Config {
55    /// The timeout value to connect to the fluentd server.
56    /// The default is 3 seconds.
57    pub timeout: Duration,
58    /// The duration of the initial wait for the first retry, in milliseconds.
59    /// The default is 500.
60    pub retry_wait: u64,
61    /// The maximum number of retries. If the number of retries become larger
62    /// than this value, the write/send operation will fail. The default is 10.
63    pub max_retry: u32,
64    /// The maximum duration of wait between retries, in milliseconds.
65    /// If calculated retry wait is larger than this value, operation will fail.
66    /// The default is 60,000 (60 seconds).
67    pub max_retry_wait: u64,
68    /// The maximum lifetime of a connection before reconnection is attempted.
69    /// Note that reconnection is only triggered when new log lines are sent.
70    /// If no new log lines are received within this timeframe, the connection
71    /// will remain open, even if it's older than `max_connection_lifetime`.
72    /// The default is 0 (no reconnection).
73    pub max_connection_lifetime: Duration,
74}
75
76impl Default for Config {
77    fn default() -> Self {
78        Self {
79            timeout: Duration::new(3, 0),
80            retry_wait: 500,
81            max_retry: 10,
82            max_retry_wait: 60000,
83            max_connection_lifetime: Duration::from_secs(0),
84        }
85    }
86}
87
88pub trait FluentClient: Send + Sync {
89    fn send(&self, tag: &str, record: Map) -> Result<(), SendError>;
90    fn stop(self) -> Result<(), SendError>;
91}
92
93#[derive(Debug, Clone)]
94/// A fluentd client.
95pub struct Client {
96    sender: Sender<Message>,
97}
98
99impl Client {
100    /// Connect to the fluentd server using TCP and create a worker with tokio::spawn.
101    pub async fn new_tcp(addr: SocketAddr, config: &Config) -> AnyhowResult<Client> {
102        let (sender, receiver) = channel(1024);
103
104        let config = config.clone();
105        let stream_config = Arc::new(TCPConnectionConfig {
106            addr: addr.to_owned(),
107            timeout: config.timeout,
108        });
109        // create the worker --
110        // new() will try to establish an connection, so it returns error if connection,
111        // so it returns error upon connection error
112        let mut worker = Worker::new(
113            stream_config,
114            config.max_connection_lifetime,
115            receiver,
116            RetryConfig {
117                initial_wait: config.retry_wait,
118                max: config.max_retry,
119                max_wait: config.max_retry_wait,
120            },
121        )
122        .await?;
123        let _ = tokio::spawn(async move { worker.run().await });
124
125        Ok(Self { sender })
126    }
127
128    /// Connect to the fluentd server using unix domain socket and create a worker with tokio::spawn.
129    pub async fn new_unix<P: AsRef<Path> + std::marker::Send>(
130        path: P,
131        config: &Config,
132    ) -> AnyhowResult<Client> {
133        let (sender, receiver) = channel(1024);
134
135        let config = config.clone();
136        let stream_config = Arc::new(UnixSocketConfig {
137            path: path.as_ref().to_path_buf(),
138            timeout: config.timeout,
139        });
140        // create the worker --
141        // new() will try to establish an connection, so it returns error if connection,
142        // so it returns error upon connection error
143        let mut worker = Worker::new(
144            stream_config,
145            config.max_connection_lifetime,
146            receiver,
147            RetryConfig {
148                initial_wait: config.retry_wait,
149                max: config.max_retry,
150                max_wait: config.max_retry_wait,
151            },
152        )
153        .await?;
154        let _ = tokio::spawn(async move {
155            worker.run().await;
156        });
157
158        Ok(Self { sender })
159    }
160
161    fn send_with_time(&self, tag: &str, record: Map, timestamp: i64) -> Result<(), SendError> {
162        let record = Record {
163            tag: tag.into(),
164            record,
165            timestamp,
166            options: Options {
167                chunk: general_purpose::STANDARD.encode(Uuid::new_v4()),
168            },
169        };
170        self.sender
171            .send(Message::Record(record))
172            .map_err(|e| SendError {
173                source: e.to_string(),
174            })?;
175        Ok(())
176    }
177}
178
179impl FluentClient for Client {
180    /// Send a fluent record to the fluentd server.
181    ///
182    /// ## Params:
183    /// `tag` - Event category of a record to send.
184    ///
185    /// `record` - Map object to send as a fluent record.
186    fn send(&self, tag: &str, record: Map) -> Result<(), SendError> {
187        self.send_with_time(tag, record, chrono::Local::now().timestamp())
188    }
189
190    /// Stop the worker.
191    fn stop(self) -> Result<(), SendError> {
192        self.sender
193            .send(Message::Terminate)
194            .map_err(|e| SendError {
195                source: e.to_string(),
196            })?;
197        Ok(())
198    }
199}
200
201/// The worker is terminated when client is dropped.
202impl Drop for Client {
203    fn drop(&mut self) {
204        let _ = self.sender.send(Message::Terminate);
205    }
206}
207
208#[derive(Debug, Clone)]
209/// NopClient does nothing.
210pub struct NopClient;
211
212impl FluentClient for NopClient {
213    fn send(&self, _tag: &str, _record: Map) -> Result<(), SendError> {
214        Ok(())
215    }
216
217    fn stop(self) -> Result<(), SendError> {
218        Ok(())
219    }
220}
221
222#[cfg(test)]
223mod tests {
224    use super::*;
225
226    #[test]
227    fn test_send_with_time() {
228        use std::collections::HashMap;
229
230        use chrono::TimeZone;
231
232        use crate::record::Value;
233        use crate::record_map;
234
235        let (sender, mut receiver) = channel(1024);
236        let client = Client { sender };
237
238        let timestamp = chrono::Utc.timestamp_opt(1234567, 0).unwrap().timestamp();
239        let record = record_map!("age".to_string() => 20.into());
240        assert!(
241            client.send_with_time("test", record, timestamp).is_ok(),
242            "failed to send with time"
243        );
244
245        let got = receiver.try_recv().expect("failed to receive");
246        match got {
247            Message::Record(r) => {
248                assert_eq!(r.tag, "test");
249                assert_eq!(r.record, record_map!("age".to_string() => 20.into()));
250                assert_eq!(r.timestamp, 1234567);
251            }
252            Message::Terminate => unreachable!("got terminate message"),
253        }
254    }
255
256    #[test]
257    fn test_stop() {
258        let (sender, mut receiver) = channel(1024);
259        let client = Client { sender };
260        assert!(client.stop().is_ok(), "faled to stop");
261
262        let got = receiver.try_recv().expect("failed to receive");
263        match got {
264            Message::Record(_) => unreachable!("got record message"),
265            Message::Terminate => {}
266        };
267    }
268
269    #[test]
270    fn test_client_drop_sends_terminate() {
271        let (sender, mut receiver) = channel(1024);
272        {
273            Client { sender };
274        }
275        let got = receiver.try_recv().expect("failed to receive");
276        match got {
277            Message::Record(_) => unreachable!("got record message"),
278            Message::Terminate => {}
279        };
280    }
281
282    #[test]
283    fn test_default_config() {
284        let config: Config = Default::default();
285        assert_eq!(config.timeout, Duration::new(3, 0));
286        assert_eq!(config.retry_wait, 500);
287        assert_eq!(config.max_retry, 10);
288        assert_eq!(config.max_retry_wait, 60000);
289    }
290}