opencellid 0.1.0

Rust client library for the OpenCellID API — sync and async clients with tracing, structured errors, and bounded I/O.
Documentation
//! Async client implementation (powered by `reqwest::Client`).

use std::sync::Arc;

use tracing::{Instrument, debug, debug_span};

use crate::client::{ClientConfig, MAX_RESPONSE_BYTES, MAX_UPLOAD_BYTES};
use crate::error::{Error, ParseError, Result};
#[cfg(feature = "csv")]
use crate::internal::endpoint::add_get_in_area_params;
use crate::internal::endpoint::{
    Endpoint, add_cell_get_params, add_get_in_area_size_params, add_measurement_params, build_url,
    prepare_upload,
};
use crate::internal::parse::{check_upload_response, finalize_response_body, parse_json};
use crate::internal::tracing::redact_api_key;
#[cfg(feature = "csv")]
use crate::params::GetCellsInAreaParams;
use crate::params::AreaQuery;
use crate::types::{Cell, CellCount, CellKey, MeasurementsPayload};

/// Cap on `MeasurementsPayload::measurements.len()` enforced before serialising —
/// prevents unbounded `Vec<u8>` allocation when a caller hands us a huge batch.
/// Even a comparatively short JSON line per measurement (~256 bytes) keeps the
/// final body well under [`MAX_UPLOAD_BYTES`] for this many entries.
const MAX_MEASUREMENTS_PER_UPLOAD: usize = 8_000;

#[derive(Debug)]
struct Inner {
    config: ClientConfig,
    http: reqwest::Client,
}

/// Asynchronous OpenCellID client.
///
/// Cheap to clone — wraps an [`Arc`] around the underlying [`reqwest::Client`] and
/// its connection pool. Reuse a single instance for many requests.
///
/// # Cancellation
///
/// Dropping a future returned by mutating methods (`add_measurement`, `upload_*`)
/// after `send().await` has begun does **not** roll back the operation on the
/// server. Treat these as at-least-once.
#[derive(Debug, Clone)]
pub struct Client {
    inner: Arc<Inner>,
}

impl Client {
    /// Internal constructor used by [`crate::ClientBuilder::build`].
    pub(crate) fn from_parts(config: ClientConfig, http: reqwest::Client) -> Self {
        Self { inner: Arc::new(Inner { config, http }) }
    }

    /// Look up a cell by `(mcc, mnc, lac, cell_id)`.
    ///
    /// # Errors
    ///
    /// - [`Error::Api`] with [`crate::ApiErrorCode::CellNotFound`] when the cell is
    ///   not in the database.
    /// - Other [`Error::Api`] variants for invalid keys, rate limits, etc.
    /// - [`Error::Transport`] for HTTP / TLS failures.
    pub async fn get_cell(&self, key: CellKey) -> Result<Cell> {
        let span = debug_span!(
            "opencellid.get_cell",
            mcc = key.mcc,
            mnc = key.mnc,
            lac = key.lac,
            cell_id = key.cell_id,
        );
        async move {
            let mut url = build_url(
                &self.inner.config.base_url,
                Endpoint::CellGet,
                &self.inner.config.api_key,
            )?;
            add_cell_get_params(&mut url, &key);
            self.get_json::<Cell>(url).await
        }
        .instrument(span)
        .await
    }

    /// Count cells matching `query` without paging through them.
    ///
    /// # Errors
    ///
    /// See [`Self::get_cell`].
    pub async fn get_cells_in_area_size(&self, query: AreaQuery) -> Result<CellCount> {
        let span = debug_span!("opencellid.get_cells_in_area_size");
        async move {
            let mut url = build_url(
                &self.inner.config.base_url,
                Endpoint::CellGetInAreaSize,
                &self.inner.config.api_key,
            )?;
            add_get_in_area_size_params(&mut url, &query);
            self.get_json::<CellCount>(url).await
        }
        .instrument(span)
        .await
    }

    /// Page through cells inside a bounding box (CSV format).
    ///
    /// # Errors
    ///
    /// See [`Self::get_cell`].
    #[cfg(feature = "csv")]
    #[cfg_attr(docsrs, doc(cfg(feature = "csv")))]
    pub async fn get_cells_in_area(&self, params: GetCellsInAreaParams) -> Result<Vec<Cell>> {
        let span = debug_span!("opencellid.get_cells_in_area");
        async move {
            let mut url = build_url(
                &self.inner.config.base_url,
                Endpoint::CellGetInArea,
                &self.inner.config.api_key,
            )?;
            add_get_in_area_params(&mut url, &params);
            let body = self.get_text(url).await?;
            crate::internal::parse::parse_cells_csv(&body)
        }
        .instrument(span)
        .await
    }

    /// Submit a single measurement (`measure/add`).
    ///
    /// # Errors
    ///
    /// See [`Self::get_cell`]. Note that this operation is non-idempotent on the
    /// server side; see the [Cancellation](Self) note above.
    pub async fn add_measurement(&self, m: &crate::types::Measurement) -> Result<()> {
        m.validate()?;
        let span = debug_span!("opencellid.add_measurement", mcc = m.mcc, mnc = m.mnc);
        async move {
            let mut url = build_url(
                &self.inner.config.base_url,
                Endpoint::MeasureAdd,
                &self.inner.config.api_key,
            )?;
            add_measurement_params(&mut url, m);
            let _ = self.get_text(url).await?;
            Ok(())
        }
        .instrument(span)
        .await
    }

    /// Bulk-upload measurements as CSV (`measure/uploadCsv`, multipart, max 2 MiB).
    ///
    /// The CSV header expected by OpenCellID is
    /// `mcc,mnc,lac,cellid,lon,lat,signal,measured_at,rating,speed,direction,act,...`.
    ///
    /// # Errors
    ///
    /// Returns [`Error::InvalidInput`] when the body exceeds 2 MiB; see
    /// [`Self::get_cell`] for other variants.
    ///
    /// # Examples
    ///
    /// ```no_run
    /// # async fn run(client: &opencellid::Client) -> opencellid::Result<()> {
    /// let csv = b"mcc,mnc,lac,cellid,lon,lat,signal,measured_at,act\n\
    ///             250,1,7,42,37.6,55.7,-95,2024-01-02 03:04:05,LTE\n".to_vec();
    /// client.upload_csv(csv).await?;
    /// # Ok(()) }
    /// ```
    pub async fn upload_csv(&self, csv: impl Into<Vec<u8>>) -> Result<()> {
        self.upload_multipart(Endpoint::MeasureUploadCsv, csv.into()).await
    }

    /// Bulk-upload measurements as JSON (`measure/uploadJson`, multipart, max 2 MiB).
    ///
    /// # Errors
    ///
    /// See [`Self::upload_csv`].
    ///
    /// # Examples
    ///
    /// ```no_run
    /// # async fn run(client: &opencellid::Client) -> opencellid::Result<()> {
    /// use opencellid::{Measurement, MeasurementsPayload, Radio};
    /// let mut payload = MeasurementsPayload::new();
    /// payload.measurements.push(
    ///     Measurement::new(55.7558, 37.6173, 250, 1, 7, 42, Radio::Lte)?
    ///         .with_signal(-95),
    /// );
    /// client.upload_json(&payload).await?;
    /// # Ok(()) }
    /// ```
    pub async fn upload_json(&self, payload: &MeasurementsPayload) -> Result<()> {
        if payload.measurements.len() > MAX_MEASUREMENTS_PER_UPLOAD {
            return Err(Error::InvalidInput(format!(
                "measurements batch is {} entries, exceeds {} limit",
                payload.measurements.len(),
                MAX_MEASUREMENTS_PER_UPLOAD
            )));
        }
        for m in &payload.measurements {
            m.validate()?;
        }
        let body = serde_json::to_vec(payload)
            .map_err(|e| Error::Parse(ParseError::with_source("serialise payload", e)))?;
        self.upload_multipart(Endpoint::MeasureUploadJson, body).await
    }

    /// Bulk-upload measurements in CLF3 format (`measure/uploadClf`, multipart, max 2 MiB).
    ///
    /// # Errors
    ///
    /// See [`Self::upload_csv`].
    ///
    /// # Examples
    ///
    /// ```no_run
    /// # async fn run(client: &opencellid::Client) -> opencellid::Result<()> {
    /// // CLF3 is a semicolon-separated text format; see the OpenCellID wiki.
    /// let clf = b"// mcc+mnc;lac;cellid;rnc;lat;lon;ratio;data;rfu\n".to_vec();
    /// client.upload_clf(clf).await?;
    /// # Ok(()) }
    /// ```
    pub async fn upload_clf(&self, clf: impl Into<Vec<u8>>) -> Result<()> {
        self.upload_multipart(Endpoint::MeasureUploadClf, clf.into()).await
    }

    async fn upload_multipart(&self, endpoint: Endpoint, body: Vec<u8>) -> Result<()> {
        let prepared = prepare_upload(endpoint, body.len(), MAX_UPLOAD_BYTES)?;
        let span = debug_span!(
            "opencellid.upload",
            endpoint = endpoint.path(),
            bytes = body.len()
        );
        async move {
            let url = build_url(&self.inner.config.base_url, endpoint, &self.inner.config.api_key)?;
            debug!(url = %redact_api_key(&url), "POST multipart");
            let part = reqwest::multipart::Part::bytes(body)
                .file_name(prepared.filename)
                .mime_str(prepared.mime)
                .map_err(|e| Error::InvalidInput(format!("mime: {e}")))?;
            // See ClientConfig::api_key for why a copy of the key is unavoidable.
            let form = reqwest::multipart::Form::new()
                .text("key", self.inner.config.api_key.to_string())
                .part("datafile", part);
            let resp = self.inner.http.post(url).multipart(form).send().await?;
            let body = read_text_with_limit(resp).await?;
            check_upload_response(&body)
        }
        .instrument(span)
        .await
    }

    async fn get_text(&self, url: url::Url) -> Result<String> {
        debug!(url = %redact_api_key(&url), "GET");
        let resp = self.inner.http.get(url).send().await?;
        read_text_with_limit(resp).await
    }

    async fn get_json<T: for<'de> serde::Deserialize<'de>>(&self, url: url::Url) -> Result<T> {
        let body = self.get_text(url).await?;
        parse_json(&body)
    }
}

/// Read the response body up to [`MAX_RESPONSE_BYTES`], surfacing HTTP errors first.
async fn read_text_with_limit(mut resp: reqwest::Response) -> Result<String> {
    let status = resp.status();
    let cap = resp
        .content_length()
        .map(|n| (n as usize).min(MAX_RESPONSE_BYTES))
        .unwrap_or(8 * 1024);
    if let Some(len) = resp.content_length() {
        if len > MAX_RESPONSE_BYTES as u64 {
            return Err(Error::Parse(ParseError::new(format!(
                "response body advertised {len} bytes, exceeds {MAX_RESPONSE_BYTES} limit"
            ))));
        }
    }
    let mut buf: Vec<u8> = Vec::with_capacity(cap);
    while let Some(chunk) = resp.chunk().await? {
        if buf.len() + chunk.len() > MAX_RESPONSE_BYTES {
            return Err(Error::Parse(ParseError::new(format!(
                "response body exceeded {MAX_RESPONSE_BYTES} byte limit"
            ))));
        }
        buf.extend_from_slice(&chunk);
    }
    finalize_response_body(status, buf)
}