1use std::net::SocketAddr;
25use std::path::Path;
26use std::sync::Arc;
27use std::time::Duration;
28
29use anyhow::Result as AnyhowResult;
30use base64::{engine::general_purpose, Engine};
31use tokio::sync::broadcast::{channel, Sender};
32use uuid::Uuid;
33
34use crate::record::Map;
35use crate::worker::{
36 Message, Options, Record, RetryConfig, TCPConnectionConfig, UnixSocketConfig, Worker,
37};
38
39#[derive(Debug, Clone)]
40pub struct SendError {
41 source: String,
42}
43
44impl std::error::Error for SendError {}
45
46impl std::fmt::Display for SendError {
47 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48 write!(f, "{}", self.source)
49 }
50}
51
52#[derive(Debug, Clone)]
53pub struct Config {
55 pub timeout: Duration,
58 pub retry_wait: u64,
61 pub max_retry: u32,
64 pub max_retry_wait: u64,
68 pub max_connection_lifetime: Duration,
74}
75
76impl Default for Config {
77 fn default() -> Self {
78 Self {
79 timeout: Duration::new(3, 0),
80 retry_wait: 500,
81 max_retry: 10,
82 max_retry_wait: 60000,
83 max_connection_lifetime: Duration::from_secs(0),
84 }
85 }
86}
87
88pub trait FluentClient: Send + Sync {
89 fn send(&self, tag: &str, record: Map) -> Result<(), SendError>;
90 fn stop(self) -> Result<(), SendError>;
91}
92
93#[derive(Debug, Clone)]
94pub struct Client {
96 sender: Sender<Message>,
97}
98
99impl Client {
100 pub async fn new_tcp(addr: SocketAddr, config: &Config) -> AnyhowResult<Client> {
102 let (sender, receiver) = channel(1024);
103
104 let config = config.clone();
105 let stream_config = Arc::new(TCPConnectionConfig {
106 addr: addr.to_owned(),
107 timeout: config.timeout,
108 });
109 let mut worker = Worker::new(
113 stream_config,
114 config.max_connection_lifetime,
115 receiver,
116 RetryConfig {
117 initial_wait: config.retry_wait,
118 max: config.max_retry,
119 max_wait: config.max_retry_wait,
120 },
121 )
122 .await?;
123 let _ = tokio::spawn(async move { worker.run().await });
124
125 Ok(Self { sender })
126 }
127
128 pub async fn new_unix<P: AsRef<Path> + std::marker::Send>(
130 path: P,
131 config: &Config,
132 ) -> AnyhowResult<Client> {
133 let (sender, receiver) = channel(1024);
134
135 let config = config.clone();
136 let stream_config = Arc::new(UnixSocketConfig {
137 path: path.as_ref().to_path_buf(),
138 timeout: config.timeout,
139 });
140 let mut worker = Worker::new(
144 stream_config,
145 config.max_connection_lifetime,
146 receiver,
147 RetryConfig {
148 initial_wait: config.retry_wait,
149 max: config.max_retry,
150 max_wait: config.max_retry_wait,
151 },
152 )
153 .await?;
154 let _ = tokio::spawn(async move {
155 worker.run().await;
156 });
157
158 Ok(Self { sender })
159 }
160
161 fn send_with_time(&self, tag: &str, record: Map, timestamp: i64) -> Result<(), SendError> {
162 let record = Record {
163 tag: tag.into(),
164 record,
165 timestamp,
166 options: Options {
167 chunk: general_purpose::STANDARD.encode(Uuid::new_v4()),
168 },
169 };
170 self.sender
171 .send(Message::Record(record))
172 .map_err(|e| SendError {
173 source: e.to_string(),
174 })?;
175 Ok(())
176 }
177}
178
179impl FluentClient for Client {
180 fn send(&self, tag: &str, record: Map) -> Result<(), SendError> {
187 self.send_with_time(tag, record, chrono::Local::now().timestamp())
188 }
189
190 fn stop(self) -> Result<(), SendError> {
192 self.sender
193 .send(Message::Terminate)
194 .map_err(|e| SendError {
195 source: e.to_string(),
196 })?;
197 Ok(())
198 }
199}
200
201impl Drop for Client {
203 fn drop(&mut self) {
204 let _ = self.sender.send(Message::Terminate);
205 }
206}
207
208#[derive(Debug, Clone)]
209pub struct NopClient;
211
212impl FluentClient for NopClient {
213 fn send(&self, _tag: &str, _record: Map) -> Result<(), SendError> {
214 Ok(())
215 }
216
217 fn stop(self) -> Result<(), SendError> {
218 Ok(())
219 }
220}
221
222#[cfg(test)]
223mod tests {
224 use super::*;
225
226 #[test]
227 fn test_send_with_time() {
228 use std::collections::HashMap;
229
230 use chrono::TimeZone;
231
232 use crate::record::Value;
233 use crate::record_map;
234
235 let (sender, mut receiver) = channel(1024);
236 let client = Client { sender };
237
238 let timestamp = chrono::Utc.timestamp_opt(1234567, 0).unwrap().timestamp();
239 let record = record_map!("age".to_string() => 20.into());
240 assert!(
241 client.send_with_time("test", record, timestamp).is_ok(),
242 "failed to send with time"
243 );
244
245 let got = receiver.try_recv().expect("failed to receive");
246 match got {
247 Message::Record(r) => {
248 assert_eq!(r.tag, "test");
249 assert_eq!(r.record, record_map!("age".to_string() => 20.into()));
250 assert_eq!(r.timestamp, 1234567);
251 }
252 Message::Terminate => unreachable!("got terminate message"),
253 }
254 }
255
256 #[test]
257 fn test_stop() {
258 let (sender, mut receiver) = channel(1024);
259 let client = Client { sender };
260 assert!(client.stop().is_ok(), "faled to stop");
261
262 let got = receiver.try_recv().expect("failed to receive");
263 match got {
264 Message::Record(_) => unreachable!("got record message"),
265 Message::Terminate => {}
266 };
267 }
268
269 #[test]
270 fn test_client_drop_sends_terminate() {
271 let (sender, mut receiver) = channel(1024);
272 {
273 Client { sender };
274 }
275 let got = receiver.try_recv().expect("failed to receive");
276 match got {
277 Message::Record(_) => unreachable!("got record message"),
278 Message::Terminate => {}
279 };
280 }
281
282 #[test]
283 fn test_default_config() {
284 let config: Config = Default::default();
285 assert_eq!(config.timeout, Duration::new(3, 0));
286 assert_eq!(config.retry_wait, 500);
287 assert_eq!(config.max_retry, 10);
288 assert_eq!(config.max_retry_wait, 60000);
289 }
290}