Skip to main content

atomcode_telemetry/sender/
http.rs

1//! HTTP POST of gzipped NDJSON segment files.
2
3use flate2::write::GzEncoder;
4use flate2::Compression;
5use reqwest::{Client, StatusCode};
6use std::io::Write;
7use std::path::Path;
8use std::time::Duration;
9use thiserror::Error;
10
11#[derive(Debug, Error)]
12pub enum SendError {
13    #[error("io: {0}")]
14    Io(#[from] std::io::Error),
15    #[error("http: {0}")]
16    Http(#[from] reqwest::Error),
17    #[error("bad request")]
18    BadRequest,
19    #[error("unauthorized")]
20    Unauthorized,
21    #[error("payload too large")]
22    PayloadTooLarge,
23    #[error("rate limited (retry after {0:?})")]
24    RateLimited(Option<Duration>),
25    #[error("server error: {0}")]
26    Server(u16),
27    #[error("network / other")]
28    Other,
29}
30
31pub struct HttpSender {
32    client: Client,
33    endpoint: String,
34}
35
36impl HttpSender {
37    pub fn new(endpoint: String, app_version: String) -> Self {
38        let client = Client::builder()
39            .timeout(Duration::from_secs(10))
40            .user_agent(format!("atomcode-telemetry/{}", app_version))
41            .build()
42            .expect("reqwest client build");
43        Self { client, endpoint }
44    }
45
46    /// Send one segment file. `dropped` is cumulative since process start.
47    pub async fn send_segment(&self, path: &Path, dropped: u64) -> Result<(), SendError> {
48        let body = std::fs::read(path)?;
49        let gz = gzip(&body)?;
50        let resp = self
51            .client
52            .post(&self.endpoint)
53            .header("content-type", "application/x-ndjson")
54            .header("content-encoding", "gzip")
55            .header("x-atomcode-dropped", dropped.to_string())
56            .header("x-atomcode-schema", crate::SCHEMA_VERSION.to_string())
57            .body(gz)
58            .send()
59            .await?;
60        map_status(resp.status(), resp.headers())
61    }
62}
63
64fn gzip(buf: &[u8]) -> std::io::Result<Vec<u8>> {
65    let mut enc = GzEncoder::new(Vec::with_capacity(buf.len() / 2), Compression::fast());
66    enc.write_all(buf)?;
67    enc.finish()
68}
69
70fn map_status(s: StatusCode, h: &reqwest::header::HeaderMap) -> Result<(), SendError> {
71    match s.as_u16() {
72        200 | 202 => Ok(()),
73        400 => Err(SendError::BadRequest),
74        401 | 403 => Err(SendError::Unauthorized),
75        413 => Err(SendError::PayloadTooLarge),
76        429 => {
77            let ra = h
78                .get("retry-after")
79                .and_then(|v| v.to_str().ok())
80                .and_then(|v| v.parse::<u64>().ok())
81                .map(Duration::from_secs);
82            Err(SendError::RateLimited(ra))
83        }
84        c if (500..600).contains(&c) => Err(SendError::Server(c)),
85        _ => Err(SendError::Other),
86    }
87}