json-poller 0.2.3

A lightweight, flexible, high-performance JSON polling library for Rust.
Documentation
use reqwest::Client;
use serde::de::DeserializeOwned;
use std::error::Error;
use std::future::Future;
use std::marker::PhantomData;
use std::time::{Duration, Instant};
use tokio::time::interval;

pub const POLL_INTERVAL_MS: u64 = 500;
pub const POOL_MAX_IDLE_PER_HOST: usize = 1;
pub const POOL_IDLE_TIMEOUT_SECS: u64 = 90;
pub const REQUEST_TIMEOUT_MS: u64 = 1000;
pub const TCP_KEEPALIVE_SECS: u64 = 60;

pub struct JsonPoller<T> {
    client: Client,
    url: String,
    poll_interval: Duration,
    _phantom: PhantomData<T>,
}

pub struct JsonPollerBuilder<T> {
    url: String,
    poll_interval_ms: u64,
    pool_max_idle_per_host: usize,
    pool_idle_timeout_secs: u64,
    request_timeout_ms: u64,
    tcp_keepalive_secs: u64,
    _phantom: PhantomData<T>,
}

impl<T> JsonPollerBuilder<T> {
    pub fn new(url: impl Into<String>) -> Self {
        Self {
            url: url.into(),
            poll_interval_ms: POLL_INTERVAL_MS,
            pool_max_idle_per_host: POOL_MAX_IDLE_PER_HOST,
            pool_idle_timeout_secs: POOL_IDLE_TIMEOUT_SECS,
            request_timeout_ms: REQUEST_TIMEOUT_MS,
            tcp_keepalive_secs: TCP_KEEPALIVE_SECS,
            _phantom: PhantomData,
        }
    }

    pub fn poll_interval_ms(mut self, ms: u64) -> Self {
        self.poll_interval_ms = ms;
        self
    }

    pub fn pool_max_idle_per_host(mut self, max: usize) -> Self {
        self.pool_max_idle_per_host = max;
        self
    }

    pub fn pool_idle_timeout_secs(mut self, secs: u64) -> Self {
        self.pool_idle_timeout_secs = secs;
        self
    }

    pub fn request_timeout_ms(mut self, ms: u64) -> Self {
        self.request_timeout_ms = ms;
        self
    }

    pub fn tcp_keepalive_secs(mut self, secs: u64) -> Self {
        self.tcp_keepalive_secs = secs;
        self
    }

    pub fn build(self) -> Result<JsonPoller<T>, reqwest::Error> {
        let client = Client::builder()
            .pool_max_idle_per_host(self.pool_max_idle_per_host)
            .pool_idle_timeout(Duration::from_secs(self.pool_idle_timeout_secs))
            .timeout(Duration::from_millis(self.request_timeout_ms))
            .tcp_keepalive(Duration::from_secs(self.tcp_keepalive_secs))
            .build()?;

        Ok(JsonPoller {
            client,
            url: self.url,
            poll_interval: Duration::from_millis(self.poll_interval_ms),
            _phantom: PhantomData,
        })
    }
}

impl<T> JsonPoller<T>
where
    T: DeserializeOwned + Send,
{
    pub fn builder(url: impl Into<String>) -> JsonPollerBuilder<T> {
        JsonPollerBuilder::new(url)
    }

    pub async fn start<F, Fut, E>(&self, mut on_data: F) -> Result<(), E>
    where
        F: FnMut(T, Duration) -> Fut + Send,
        Fut: Future<Output = Result<(), E>> + Send,
        E: std::fmt::Debug,
    {
        let mut interval_timer = interval(self.poll_interval);
        interval_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

        loop {
            interval_timer.tick().await;
            let request_start = Instant::now();
            match self.fetch().await {
                Ok(data) => {
                    let elapsed = request_start.elapsed();
                    on_data(data, elapsed).await?;
                }
                Err(e) => {
                    tracing::error!("Failed to fetch data: {:?}", e);
                    continue;
                }
            }
        }
    }

    async fn fetch(&self) -> Result<T, Box<dyn Error + Send + Sync>> {
        let response = self.client.get(&self.url).send().await?;

        if !response.status().is_success() {
            return Err(format!("HTTP {}", response.status()).into());
        }

        Ok(response.json::<T>().await?)
    }

    pub async fn fetch_once(&self) -> Result<T, Box<dyn Error + Send + Sync>> {
        self.fetch().await
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use serde::Deserialize;

    #[derive(Debug, Deserialize, PartialEq)]
    struct HttpBinJson {
        slideshow: Slideshow,
    }

    #[derive(Debug, Deserialize, PartialEq)]
    struct Slideshow {
        author: String,
        date: String,
        title: String,
        slides: Vec<Slide>,
    }

    #[derive(Debug, Deserialize, PartialEq)]
    struct Slide {
        title: String,
        #[serde(rename = "type")]
        slide_type: String,
        #[serde(default)]
        items: Vec<String>,
    }

    #[test]
    fn test_builder_defaults() {
        let poller = JsonPoller::<HttpBinJson>::builder("https://example.com")
            .build()
            .unwrap();

        assert_eq!(
            poller.poll_interval,
            Duration::from_millis(POLL_INTERVAL_MS)
        );
        assert_eq!(poller.url, "https://example.com");
    }

    #[test]
    fn test_builder_custom_config() {
        let poller = JsonPoller::<HttpBinJson>::builder("https://example.com")
            .poll_interval_ms(1000)
            .request_timeout_ms(2000)
            .build()
            .unwrap();

        assert_eq!(poller.poll_interval, Duration::from_millis(1000));
    }

    #[tokio::test]
    async fn test_http_error() {
        let poller = JsonPoller::<HttpBinJson>::builder("https://httpbin.org/status/404")
            .build()
            .unwrap();

        let result = poller.fetch_once().await;
        assert!(result.is_err());
    }

    #[tokio::test]
    async fn test_invalid_json() {
        let poller = JsonPoller::<HttpBinJson>::builder("https://httpbin.org/html")
            .build()
            .unwrap();

        let result = poller.fetch_once().await;
        assert!(result.is_err());
    }

    #[tokio::test]
    async fn test_fetch_once() {
        let json_poller = JsonPoller::<HttpBinJson>::builder("https://httpbin.org/json")
            .build()
            .unwrap();
        let data = json_poller.fetch_once().await.unwrap();

        assert_eq!(data.slideshow.author, "Yours Truly");
        assert_eq!(data.slideshow.title, "Sample Slide Show");
    }
}