opencellid 0.2.0

Rust client library for the OpenCellID API — sync and async clients with tracing, structured errors, and bounded I/O.
Documentation
//! Blocking client implementation (powered by `reqwest::blocking::Client`).

use std::io::Read;
use std::sync::Arc;

use tracing::{debug, debug_span};

use crate::client::{
    ClientConfig, DOWNLOAD_SNIFF_BYTES, DUMP_READ_BUF_BYTES, MAX_MEASUREMENTS_PER_UPLOAD,
    MAX_RESPONSE_BYTES, MAX_UPLOAD_BYTES,
};
use crate::error::{Error, ParseError, Result};
#[cfg(feature = "csv")]
use crate::internal::endpoint::add_get_in_area_params;
use crate::internal::endpoint::{
    Endpoint, add_cell_get_params, add_dump_params, add_get_in_area_size_params,
    add_measurement_params, build_url, build_url_with_token, prepare_upload,
};
use crate::internal::parse::{
    check_upload_response, finalize_response_body, parse_diff_listing, parse_json,
    validate_dump_head,
};
use crate::internal::tracing::redact_api_key;
#[cfg(feature = "csv")]
use crate::params::GetCellsInAreaParams;
use crate::params::AreaQuery;
use crate::types::{Cell, CellCount, CellKey, DumpKind, DumpListing, MeasurementsPayload};


#[derive(Debug)]
struct Inner {
    config: ClientConfig,
    http: reqwest::blocking::Client,
}

/// Synchronous OpenCellID client.
///
/// Mirrors [`crate::Client`] but issues blocking calls. Cheap to clone.
///
/// **Do not** invoke blocking methods from inside a Tokio runtime; use
/// [`tokio::task::spawn_blocking`] or call from native threads instead.
#[derive(Debug, Clone)]
pub struct BlockingClient {
    inner: Arc<Inner>,
}

impl BlockingClient {
    /// Internal constructor used by [`crate::ClientBuilder::build_blocking`].
    pub(crate) fn from_parts(config: ClientConfig, http: reqwest::blocking::Client) -> Self {
        Self { inner: Arc::new(Inner { config, http }) }
    }

    /// Look up a cell by `(mcc, mnc, lac, cell_id)`.
    ///
    /// # Errors
    ///
    /// See [`crate::Client::get_cell`].
    pub fn get_cell(&self, key: CellKey) -> Result<Cell> {
        let _span = debug_span!(
            "opencellid.get_cell",
            mcc = key.mcc,
            mnc = key.mnc,
            lac = key.lac,
            cell_id = key.cell_id,
        )
        .entered();
        let mut url = build_url(
            &self.inner.config.base_url,
            Endpoint::CellGet,
            &self.inner.config.api_key,
        )?;
        add_cell_get_params(&mut url, &key);
        self.get_json::<Cell>(url)
    }

    /// Count cells matching `query` without paging through them.
    ///
    /// # Errors
    ///
    /// See [`crate::Client::get_cell`].
    pub fn get_cells_in_area_size(&self, query: AreaQuery) -> Result<CellCount> {
        let _span = debug_span!("opencellid.get_cells_in_area_size").entered();
        let mut url = build_url(
            &self.inner.config.base_url,
            Endpoint::CellGetInAreaSize,
            &self.inner.config.api_key,
        )?;
        add_get_in_area_size_params(&mut url, &query);
        self.get_json::<CellCount>(url)
    }

    /// Page through cells inside a bounding box (CSV format).
    ///
    /// # Errors
    ///
    /// See [`crate::Client::get_cell`].
    #[cfg(feature = "csv")]
    #[cfg_attr(docsrs, doc(cfg(feature = "csv")))]
    pub fn get_cells_in_area(&self, params: GetCellsInAreaParams) -> Result<Vec<Cell>> {
        let _span = debug_span!("opencellid.get_cells_in_area").entered();
        let mut url = build_url(
            &self.inner.config.base_url,
            Endpoint::CellGetInArea,
            &self.inner.config.api_key,
        )?;
        add_get_in_area_params(&mut url, &params);
        let body = self.get_text(url)?;
        crate::internal::parse::parse_cells_csv(&body)
    }

    /// Submit a single measurement (`measure/add`).
    ///
    /// # Errors
    ///
    /// See [`crate::Client::add_measurement`].
    pub fn add_measurement(&self, m: &crate::types::Measurement) -> Result<()> {
        m.validate()?;
        let _span = debug_span!("opencellid.add_measurement", mcc = m.mcc, mnc = m.mnc).entered();
        let mut url = build_url(
            &self.inner.config.base_url,
            Endpoint::MeasureAdd,
            &self.inner.config.api_key,
        )?;
        add_measurement_params(&mut url, m);
        let _ = self.get_text(url)?;
        Ok(())
    }

    /// Bulk-upload measurements as CSV (`measure/uploadCsv`, multipart, max 2 MiB).
    ///
    /// # Errors
    ///
    /// See [`crate::Client::upload_csv`].
    pub fn upload_csv(&self, csv: impl Into<Vec<u8>>) -> Result<()> {
        self.upload_multipart(Endpoint::MeasureUploadCsv, csv.into())
    }

    /// Bulk-upload measurements as JSON (`measure/uploadJson`, multipart, max 2 MiB).
    ///
    /// # Errors
    ///
    /// See [`crate::Client::upload_csv`].
    pub fn upload_json(&self, payload: &MeasurementsPayload) -> Result<()> {
        if payload.measurements.len() > MAX_MEASUREMENTS_PER_UPLOAD {
            return Err(Error::InvalidInput(format!(
                "measurements batch is {} entries, exceeds {} limit",
                payload.measurements.len(),
                MAX_MEASUREMENTS_PER_UPLOAD
            )));
        }
        for m in &payload.measurements {
            m.validate()?;
        }
        let body = serde_json::to_vec(payload)
            .map_err(|e| Error::Parse(ParseError::with_source("serialise payload", e)))?;
        self.upload_multipart(Endpoint::MeasureUploadJson, body)
    }

    /// Bulk-upload measurements in CLF3 format (`measure/uploadClf`, multipart, max 2 MiB).
    ///
    /// # Errors
    ///
    /// See [`crate::Client::upload_csv`].
    pub fn upload_clf(&self, clf: impl Into<Vec<u8>>) -> Result<()> {
        self.upload_multipart(Endpoint::MeasureUploadClf, clf.into())
    }

    fn upload_multipart(&self, endpoint: Endpoint, body: Vec<u8>) -> Result<()> {
        let prepared = prepare_upload(endpoint, body.len(), MAX_UPLOAD_BYTES)?;
        let _span = debug_span!(
            "opencellid.upload",
            endpoint = endpoint.path(),
            bytes = body.len()
        )
        .entered();
        let url = build_url(&self.inner.config.base_url, endpoint, &self.inner.config.api_key)?;
        debug!(url = %redact_api_key(&url), "POST multipart");
        let part = reqwest::blocking::multipart::Part::bytes(body)
            .file_name(prepared.filename)
            .mime_str(prepared.mime)
            .map_err(|e| Error::InvalidInput(format!("mime: {e}")))?;
        // See ClientConfig::api_key for why a copy of the key is unavoidable.
        let form = reqwest::blocking::multipart::Form::new()
            .text("key", self.inner.config.api_key.to_string())
            .part("datafile", part);
        let resp = self.inner.http.post(url).multipart(form).send()?;
        let body = read_text_with_limit(resp)?;
        check_upload_response(&body)
    }

    fn get_text(&self, url: url::Url) -> Result<String> {
        debug!(url = %redact_api_key(&url), "GET");
        let resp = self.inner.http.get(url).send()?;
        read_text_with_limit(resp)
    }

    fn get_json<T: for<'de> serde::Deserialize<'de>>(&self, url: url::Url) -> Result<T> {
        let body = self.get_text(url)?;
        parse_json(&body)
    }

    /// Stream a gzipped CSV dump to `writer`. See [`crate::Client::download_dump`]
    /// for the full contract.
    ///
    /// # Errors
    ///
    /// Same as [`crate::Client::download_dump`].
    pub fn download_dump<W: std::io::Write>(
        &self,
        kind: DumpKind,
        writer: &mut W,
    ) -> Result<u64> {
        let _span = debug_span!(
            "opencellid.download_dump",
            kind = dump_kind_tag(&kind),
            bytes = tracing::field::Empty,
        )
        .entered();
        let mut url = build_url_with_token(
            &self.inner.config.base_url,
            Endpoint::Downloads,
            &self.inner.config.api_key,
        )?;
        add_dump_params(&mut url, &kind)?;
        debug!(url = %redact_api_key(&url), "GET dump");

        let mut resp = self
            .inner
            .http
            .get(url)
            .timeout(self.inner.config.download_timeout)
            .send()?;
        let max_dump_bytes = self.inner.config.max_dump_bytes;
        if let Some(len) = resp.content_length() {
            if len > max_dump_bytes {
                return Err(Error::Parse(ParseError::new(format!(
                    "dump body advertised {len} bytes, exceeds {max_dump_bytes} limit"
                ))));
            }
        }

        let status = resp.status();
        let mut head = vec![0u8; DOWNLOAD_SNIFF_BYTES];
        let n = fill_buf_best_effort(&mut resp, &mut head)?;
        head.truncate(n);
        validate_dump_head(status, &head)?;

        writer
            .write_all(&head)
            .map_err(|e| Error::Parse(ParseError::with_source("write dump body", e)))?;
        let mut total = head.len() as u64;
        let mut buf = [0u8; DUMP_READ_BUF_BYTES];
        loop {
            let read = resp
                .read(&mut buf)
                .map_err(|e| Error::Parse(ParseError::with_source("read dump body", e)))?;
            if read == 0 {
                break;
            }
            if total + read as u64 > max_dump_bytes {
                return Err(Error::Parse(ParseError::new(format!(
                    "dump body exceeded {max_dump_bytes} byte limit"
                ))));
            }
            writer
                .write_all(&buf[..read])
                .map_err(|e| Error::Parse(ParseError::with_source("write dump body", e)))?;
            total += read as u64;
        }
        writer
            .flush()
            .map_err(|e| Error::Parse(ParseError::with_source("flush dump body", e)))?;
        tracing::Span::current().record("bytes", total);
        tracing::trace!(bytes = total, "dump streamed");
        Ok(total)
    }

    /// See [`crate::Client::download_dump_to_path`].
    pub fn download_dump_to_path(
        &self,
        kind: DumpKind,
        path: impl AsRef<std::path::Path>,
    ) -> Result<u64> {
        let final_path = path.as_ref().to_path_buf();
        let mut part_os = final_path.as_os_str().to_owned();
        part_os.push(".part");
        let part_path = std::path::PathBuf::from(part_os);

        let mut file = std::fs::File::create(&part_path)
            .map_err(|e| Error::Parse(ParseError::with_source("create dump file", e)))?;
        match self.download_dump(kind, &mut file) {
            Ok(n) => {
                drop(file);
                std::fs::rename(&part_path, &final_path).map_err(|e| {
                    Error::Parse(ParseError::with_source("rename dump file", e))
                })?;
                Ok(n)
            }
            Err(e) => {
                drop(file);
                if let Err(rm_err) = std::fs::remove_file(&part_path) {
                    tracing::warn!(
                        error = %rm_err,
                        path = %part_path.display(),
                        "failed to remove partial dump"
                    );
                }
                Err(e)
            }
        }
    }

    /// See [`crate::Client::list_daily_diffs`].
    pub fn list_daily_diffs(&self) -> Result<Vec<DumpListing>> {
        let _span = debug_span!("opencellid.list_daily_diffs").entered();
        let url = build_url_with_token(
            &self.inner.config.base_url,
            Endpoint::DownloadsList,
            &self.inner.config.api_key,
        )?;
        let html = self.get_text(url)?;
        Ok(parse_diff_listing(&html))
    }
}

/// Best-effort fill of `buf` from `reader`. Stops at EOF or when `buf` is full;
/// returns the number of bytes actually read (≤ `buf.len()`).
fn fill_buf_best_effort<R: std::io::Read>(reader: &mut R, buf: &mut [u8]) -> Result<usize> {
    let mut filled = 0usize;
    while filled < buf.len() {
        let n = reader
            .read(&mut buf[filled..])
            .map_err(|e| Error::Parse(ParseError::with_source("read dump head", e)))?;
        if n == 0 {
            break;
        }
        filled += n;
    }
    Ok(filled)
}

/// Stable string tag for tracing spans — `world | mcc | diff`.
fn dump_kind_tag(kind: &DumpKind) -> &'static str {
    match kind {
        DumpKind::World => "world",
        DumpKind::Country(_) => "mcc",
        DumpKind::Daily { .. } => "diff",
    }
}

/// Read the response body up to [`MAX_RESPONSE_BYTES`].
///
/// Reads one byte beyond the limit so the post-check distinguishes "exactly N
/// bytes" from "more than N bytes".
fn read_text_with_limit(resp: reqwest::blocking::Response) -> Result<String> {
    let status = resp.status();
    let cap = resp
        .content_length()
        .map(|n| (n as usize).min(MAX_RESPONSE_BYTES))
        .unwrap_or(8 * 1024);
    if let Some(len) = resp.content_length() {
        if len > MAX_RESPONSE_BYTES as u64 {
            return Err(Error::Parse(ParseError::new(format!(
                "response body advertised {len} bytes, exceeds {MAX_RESPONSE_BYTES} limit"
            ))));
        }
    }
    let mut buf: Vec<u8> = Vec::with_capacity(cap);
    resp.take(MAX_RESPONSE_BYTES as u64 + 1)
        .read_to_end(&mut buf)
        .map_err(|e| Error::Parse(ParseError::with_source("response read", e)))?;
    if buf.len() > MAX_RESPONSE_BYTES {
        return Err(Error::Parse(ParseError::new(format!(
            "response body exceeded {MAX_RESPONSE_BYTES} byte limit"
        ))));
    }
    finalize_response_body(status, buf)
}