atomcode_telemetry/sender/
http.rs1use flate2::write::GzEncoder;
4use flate2::Compression;
5use reqwest::{Client, StatusCode};
6use std::io::Write;
7use std::path::Path;
8use std::time::Duration;
9use thiserror::Error;
10
11#[derive(Debug, Error)]
12pub enum SendError {
13 #[error("io: {0}")]
14 Io(#[from] std::io::Error),
15 #[error("http: {0}")]
16 Http(#[from] reqwest::Error),
17 #[error("bad request")]
18 BadRequest,
19 #[error("unauthorized")]
20 Unauthorized,
21 #[error("payload too large")]
22 PayloadTooLarge,
23 #[error("rate limited (retry after {0:?})")]
24 RateLimited(Option<Duration>),
25 #[error("server error: {0}")]
26 Server(u16),
27 #[error("network / other")]
28 Other,
29}
30
31pub struct HttpSender {
32 client: Client,
33 endpoint: String,
34}
35
36impl HttpSender {
37 pub fn new(endpoint: String, app_version: String) -> Self {
38 let client = Client::builder()
39 .timeout(Duration::from_secs(10))
40 .user_agent(format!("atomcode-telemetry/{}", app_version))
41 .build()
42 .expect("reqwest client build");
43 Self { client, endpoint }
44 }
45
46 pub async fn send_segment(&self, path: &Path, dropped: u64) -> Result<(), SendError> {
48 let body = std::fs::read(path)?;
49 let gz = gzip(&body)?;
50 let resp = self
51 .client
52 .post(&self.endpoint)
53 .header("content-type", "application/x-ndjson")
54 .header("content-encoding", "gzip")
55 .header("x-atomcode-dropped", dropped.to_string())
56 .header("x-atomcode-schema", crate::SCHEMA_VERSION.to_string())
57 .body(gz)
58 .send()
59 .await?;
60 map_status(resp.status(), resp.headers())
61 }
62}
63
64fn gzip(buf: &[u8]) -> std::io::Result<Vec<u8>> {
65 let mut enc = GzEncoder::new(Vec::with_capacity(buf.len() / 2), Compression::fast());
66 enc.write_all(buf)?;
67 enc.finish()
68}
69
70fn map_status(s: StatusCode, h: &reqwest::header::HeaderMap) -> Result<(), SendError> {
71 match s.as_u16() {
72 200 | 202 => Ok(()),
73 400 => Err(SendError::BadRequest),
74 401 | 403 => Err(SendError::Unauthorized),
75 413 => Err(SendError::PayloadTooLarge),
76 429 => {
77 let ra = h
78 .get("retry-after")
79 .and_then(|v| v.to_str().ok())
80 .and_then(|v| v.parse::<u64>().ok())
81 .map(Duration::from_secs);
82 Err(SendError::RateLimited(ra))
83 }
84 c if (500..600).contains(&c) => Err(SendError::Server(c)),
85 _ => Err(SendError::Other),
86 }
87}