1use crate::error::{Error, Result};
4use crate::models::{ClientConfig, TelemetryBatch, TelemetryData};
5use reqwest::Client as HttpClient;
6use std::sync::Arc;
7use tokio::sync::Mutex;
8
9#[derive(Debug, Clone)]
11pub struct Client {
12 inner: Arc<ClientInner>,
13}
14
15#[derive(Debug)]
16struct ClientInner {
17 http_client: HttpClient,
18 config: ClientConfig,
19}
20
21impl Client {
22 pub fn new(config: ClientConfig) -> Result<Self> {
24 let http_client = reqwest::Client::builder()
25 .timeout(std::time::Duration::from_secs(config.timeout_secs))
26 .build()
27 .map_err(Error::from)?;
28
29 Ok(Self {
30 inner: Arc::new(ClientInner {
31 http_client,
32 config,
33 }),
34 })
35 }
36
37 pub async fn send_telemetry(&self, batch: TelemetryBatch) -> Result<()> {
39 let url = format!("{}/api/v1/telemetry", self.inner.config.base_url);
40
41 let response = self.inner.http_client
42 .post(&url)
43 .json(&batch)
44 .send()
45 .await?;
46
47 if !response.status().is_success() {
48 let status = response.status();
49 let body = response.text().await.unwrap_or_default();
50 return Err(Error::Server(format!(
51 "Server returned {}: {}",
52 status,
53 body
54 )));
55 }
56
57 Ok(())
58 }
59
60 pub fn telemetry(&self) -> TelemetryClient {
62 TelemetryClient::new(self.clone())
63 }
64}
65
66#[derive(Debug, Clone)]
68pub struct TelemetryClient {
69 client: Client,
70 buffer: std::sync::Arc<Mutex<Vec<TelemetryData>>>,
71 batch_size: usize,
72}
73
74impl TelemetryClient {
75 pub fn new(client: Client) -> Self {
77 Self {
78 client,
79 buffer: std::sync::Arc::new(Mutex::new(Vec::with_capacity(100))),
80 batch_size: 100,
81 }
82 }
83
84 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
86 self.batch_size = batch_size;
87 self
88 }
89
90 pub async fn record(&self, data: TelemetryData) -> Result<()> {
92 let mut buffer = self.buffer.lock().await;
93 buffer.push(data);
94
95 if buffer.len() >= self.batch_size {
96 self.flush().await?;
97 }
98
99 Ok(())
100 }
101
102 pub async fn flush(&self) -> Result<()> {
104 let batch = {
105 let mut buffer = self.buffer.lock().await;
106 let batch = TelemetryBatch {
107 points: buffer.drain(..).collect(),
108 };
109 batch
110 };
111
112 if !batch.points.is_empty() {
113 self.client.send_telemetry(batch).await?;
114 }
115
116 Ok(())
117 }
118}
119
120#[cfg(test)]
121mod tests {
122 use super::*;
123 use mockito::Server;
124
125 #[tokio::test]
126 async fn test_send_telemetry() {
127 let mut server = Server::new_async().await;
129
130 let _m = server
132 .mock("POST", "/api/v1/telemetry")
133 .with_status(200)
134 .create_async()
135 .await;
136
137 let config = ClientConfig {
139 base_url: server.url(),
140 ..Default::default()
141 };
142
143 let client = Client::new(config).unwrap();
144 let batch = TelemetryBatch { points: vec![] };
145
146 let result = client.send_telemetry(batch).await;
148 assert!(result.is_ok());
149
150 _m.assert_async().await;
152 }
153}