libplasmoid-updater 0.2.0

Library for updating KDE Plasma 6 components from the KDE Store. Meant for use in topgrade.
Documentation
// SPDX-License-Identifier: GPL-3.0-or-later
//
// API interaction based on Apdatifier (https://github.com/exequtic/apdatifier) - MIT License
// and KDE Discover (https://invent.kde.org/plasma/discover) -
// GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL

use std::{
    sync::{
        Arc,
        atomic::{AtomicUsize, Ordering},
    },
    thread,
    time::Duration,
};

use rayon::iter::{IntoParallelRefIterator, ParallelIterator};

use crate::{
    types::{ComponentType, StoreEntry},
    {Error, Result},
};

use super::config::{ApiConfig, CONNECT_TIMEOUT, DEFAULT_API_CONFIG, REQUEST_TIMEOUT, USER_AGENT};
use super::ocs_parser::Meta;
use super::ocs_parser::{build_category_string, parse_ocs_response};

/// Thread-safe API client for KDE Store interactions.
#[derive(Clone)]
pub(crate) struct ApiClient {
    client: reqwest::blocking::Client,
    config: &'static ApiConfig,
    request_count: Arc<AtomicUsize>,
}

impl Default for ApiClient {
    fn default() -> Self {
        Self::new()
    }
}

impl ApiClient {
    /// Creates a new API client with default configuration.
    ///
    /// # Panics
    ///
    /// Panics if the HTTP client cannot be created (e.g., TLS backend unavailable).
    pub fn new() -> Self {
        Self::with_config(&DEFAULT_API_CONFIG)
            .unwrap_or_else(|e| panic!("failed to create API client: {e}"))
    }

    /// Creates a new API client with the given configuration.
    pub(super) fn with_config(config: &'static ApiConfig) -> Result<Self> {
        let client = reqwest::blocking::Client::builder()
            .connect_timeout(CONNECT_TIMEOUT)
            .timeout(REQUEST_TIMEOUT)
            .user_agent(USER_AGENT)
            .build()?;

        Ok(Self {
            client,
            config,
            request_count: Arc::new(AtomicUsize::new(0)),
        })
    }

    /// Returns a reference to the underlying HTTP client for reuse.
    pub fn http_client(&self) -> &reqwest::blocking::Client {
        &self.client
    }

    /// Total number of HTTP requests sent since this client was created.
    #[cfg(feature = "debug")]
    pub fn request_count(&self) -> usize {
        self.request_count.load(Ordering::Relaxed)
    }

    /// A shared handle to the request counter, suitable for passing to the installer.
    pub(crate) fn request_counter(&self) -> Arc<AtomicUsize> {
        Arc::clone(&self.request_count)
    }

    /// Fetches all content from specified categories with parallel page fetching.
    pub fn fetch_all(&self, categories: &[ComponentType]) -> Result<Vec<StoreEntry>> {
        let category_str = build_category_string(categories);
        let base_url = self.config.base_url;
        let page_size = self.config.page_size;

        let first_url = format!(
            "{base_url}/content/data?categories={category_str}&page=0&pagesize={page_size}&sort=new"
        );

        let (first_entries, meta) = self.fetch_page(&first_url)?;
        let total_items = meta.total_items;

        if total_items <= u32::from(page_size) {
            return Ok(first_entries);
        }

        let total_pages = total_items.div_ceil(u32::from(page_size));

        if total_pages > 10 {
            log::warn!(
                target: "api",
                "store returned {total_items} items across {total_pages} pages; fetch may be slow"
            );
        }

        let remaining_pages: Vec<u32> = (1..total_pages).collect();

        let results: Vec<Result<(Vec<StoreEntry>, _)>> = remaining_pages
            .par_iter()
            .map(|&page| {
                let url = format!(
                    "{base_url}/content/data?categories={category_str}&page={page}&pagesize={page_size}&sort=new"
                );
                self.fetch_page(&url)
            })
            .collect();

        let mut all_entries = first_entries;
        let mut error_count = 0usize;
        for result in results {
            match result {
                Ok((entries, _)) => all_entries.extend(entries),
                Err(_) => error_count += 1,
            }
        }

        if error_count > 0 {
            log::warn!(target: "api", "{error_count} page{} failed to fetch", if error_count == 1 { "" } else { "s" });
        }

        Ok(all_entries)
    }

    /// Fetches content details of multiple components.
    pub fn fetch_details(&self, content_ids: &[u64]) -> Vec<Result<StoreEntry>> {
        content_ids
            .par_iter()
            .map(|&id| {
                let base_url = self.config.base_url;
                let url = format!("{base_url}/content/data/{id}");
                let (entries, _) = self.fetch_page(&url)?;
                entries
                    .into_iter()
                    .next()
                    .ok_or_else(|| Error::ComponentNotFound(format!("store content id {id}")))
            })
            .collect()
    }

    fn fetch_page(&self, url: &str) -> Result<(Vec<StoreEntry>, Meta)> {
        let mut backoff_ms = self.config.initial_backoff_ms;

        for attempt in 0..self.config.max_retries {
            self.request_count.fetch_add(1, Ordering::Relaxed);
            let r = self.client.get(url).send()?;
            let retry_after_secs = parse_retry_after(&r);

            // HTTP 429: respect Retry-After with a single retry.
            if r.status() == reqwest::StatusCode::TOO_MANY_REQUESTS {
                return match retry_after_secs {
                    Some(secs) => self.send_after(url, secs),
                    None => Err(Error::RateLimited),
                };
            }

            let xml = r.text()?;
            match parse_ocs_response(&xml) {
                Ok(result) => return Ok(result),
                // OCS rate limit with Retry-After: respect it with a single retry.
                Err(Error::RateLimited) if retry_after_secs.is_some() => {
                    return self.send_after(url, retry_after_secs.unwrap());
                }
                // Retry transient errors (including OCS rate limit without Retry-After).
                // ApiError is a deterministic OCS status — retrying wastes a request.
                Err(ref e)
                    if !matches!(e, Error::ApiError(_))
                        && attempt + 1 < self.config.max_retries =>
                {
                    thread::sleep(Duration::from_millis(backoff_ms.into()));
                    backoff_ms = backoff_ms.saturating_mul(2);
                }
                Err(e) => return Err(e),
            }
        }

        Err(Error::other("max retries exceeded"))
    }

    /// Sleeps for `secs` then sends one retry request.
    fn send_after(&self, url: &str, secs: u64) -> Result<(Vec<StoreEntry>, Meta)> {
        log::info!(target: "api", "rate limited, retrying after {secs}s");
        thread::sleep(Duration::from_secs(secs));
        self.request_count.fetch_add(1, Ordering::Relaxed);
        let r = self.client.get(url).send()?;

        if r.status() == reqwest::StatusCode::TOO_MANY_REQUESTS {
            return Err(Error::RateLimited);
        }

        let xml = r.text()?;
        parse_ocs_response(&xml)
    }
}

fn parse_retry_after(response: &reqwest::blocking::Response) -> Option<u64> {
    response
        .headers()
        .get(reqwest::header::RETRY_AFTER)?
        .to_str()
        .ok()?
        .parse()
        .ok()
}