matomo-rs 0.1.0

Async client for the Matomo Reporting API, focused on data export and migration
Documentation
use std::pin::Pin;
use std::task::{Context, Poll};

use futures_core::Stream;
use serde::{Deserialize, Serialize};
use serde_json::Value;

use crate::endpoints::{
    ActionsGetDownloads, ActionsGetOutlinks, ActionsGetPageTitles, ActionsGetPageUrls,
    LiveGetLastVisitsDetails, ReferrersGetAll, ReferrersGetReferrerType, VisitsSummaryGet,
};
use crate::error::{Error, Result};
use crate::models::{
    ActionPage, Download, Outlink, ReferrerAll, ReferrerType, Visit, VisitsSummary,
};
use crate::params::{IdSite, Limit, Period, Segment};
use crate::request::Params;
use crate::reqwest::MatomoClient;

/// Handle for the `VisitsSummary` module.
#[derive(Clone, Copy)]
pub struct VisitsSummaryHandle<'a> {
    client: &'a MatomoClient,
}

impl<'a> VisitsSummaryHandle<'a> {
    pub(crate) fn new(client: &'a MatomoClient) -> Self {
        VisitsSummaryHandle { client }
    }

    pub async fn get(
        &self,
        id_site: impl Into<IdSite>,
        period: Period,
        segment: Option<Segment>,
    ) -> Result<VisitsSummary> {
        self.client
            .query(VisitsSummaryGet {
                id_site: id_site.into(),
                period,
                segment,
            })
            .await
    }
}

/// Handle for the `Actions` module.
#[derive(Clone, Copy)]
pub struct ActionsHandle<'a> {
    client: &'a MatomoClient,
}

impl<'a> ActionsHandle<'a> {
    pub(crate) fn new(client: &'a MatomoClient) -> Self {
        ActionsHandle { client }
    }

    pub async fn get_page_urls(
        &self,
        id_site: impl Into<IdSite>,
        period: Period,
        segment: Option<Segment>,
    ) -> Result<Vec<ActionPage>> {
        self.client
            .query(ActionsGetPageUrls {
                id_site: id_site.into(),
                period,
                segment,
            })
            .await
    }

    pub async fn get_page_titles(
        &self,
        id_site: impl Into<IdSite>,
        period: Period,
        segment: Option<Segment>,
    ) -> Result<Vec<ActionPage>> {
        self.client
            .query(ActionsGetPageTitles {
                id_site: id_site.into(),
                period,
                segment,
            })
            .await
    }

    pub async fn get_downloads(
        &self,
        id_site: impl Into<IdSite>,
        period: Period,
        segment: Option<Segment>,
    ) -> Result<Vec<Download>> {
        self.client
            .query(ActionsGetDownloads {
                id_site: id_site.into(),
                period,
                segment,
            })
            .await
    }

    pub async fn get_outlinks(
        &self,
        id_site: impl Into<IdSite>,
        period: Period,
        segment: Option<Segment>,
    ) -> Result<Vec<Outlink>> {
        self.client
            .query(ActionsGetOutlinks {
                id_site: id_site.into(),
                period,
                segment,
            })
            .await
    }
}

/// Handle for the `Referrers` module.
#[derive(Clone, Copy)]
pub struct ReferrersHandle<'a> {
    client: &'a MatomoClient,
}

impl<'a> ReferrersHandle<'a> {
    pub(crate) fn new(client: &'a MatomoClient) -> Self {
        ReferrersHandle { client }
    }

    pub async fn get_referrer_type(
        &self,
        id_site: impl Into<IdSite>,
        period: Period,
        segment: Option<Segment>,
    ) -> Result<Vec<ReferrerType>> {
        self.client
            .query(ReferrersGetReferrerType {
                id_site: id_site.into(),
                period,
                segment,
            })
            .await
    }

    pub async fn get_all(
        &self,
        id_site: impl Into<IdSite>,
        period: Period,
        segment: Option<Segment>,
    ) -> Result<Vec<ReferrerAll>> {
        self.client
            .query(ReferrersGetAll {
                id_site: id_site.into(),
                period,
                segment,
            })
            .await
    }
}

/// Handle for the `API` module.
#[derive(Clone, Copy)]
pub struct ApiHandle<'a> {
    client: &'a MatomoClient,
}

impl<'a> ApiHandle<'a> {
    pub(crate) fn new(client: &'a MatomoClient) -> Self {
        ApiHandle { client }
    }

    pub async fn version(&self) -> Result<String> {
        let value = self
            .client
            .call("API.getMatomoVersion", &Params::new())
            .await?;
        Ok(value
            .get("value")
            .and_then(Value::as_str)
            .unwrap_or_default()
            .to_string())
    }

    /// Raw report metadata as a JSON value.
    pub async fn report_metadata(&self) -> Result<Value> {
        self.client
            .call("API.getReportMetadata", &Params::new())
            .await
    }

    /// Compose multiple calls into a single `API.getBulkRequest`.
    pub async fn bulk_request(&self, calls: &[(&str, Params)]) -> Result<Value> {
        let mut params = Params::new();
        for (i, (method, p)) in calls.iter().enumerate() {
            params = params.set(format!("urls[{i}]"), p.to_bulk_query(method));
        }
        self.client.call("API.getBulkRequest", &params).await
    }
}

/// Handle for the `Live` module.
#[derive(Clone, Copy)]
pub struct LiveHandle<'a> {
    client: &'a MatomoClient,
}

/// Serializable paging context for resuming a `Live.getLastVisitsDetails`
/// export across restarts.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct Cursor {
    pub id_site: u32,
    pub period: String,
    pub date: String,
    pub segment: Option<String>,
    pub page_size: u32,
    pub offset: u32,
}

impl Cursor {
    /// Build the initial cursor for a paged export. Rejects `Limit::All`, which
    /// has no termination guarantee in the paging path.
    pub fn new(
        id_site: u32,
        period: Period,
        page_size: Limit,
        segment: Option<Segment>,
    ) -> Result<Self> {
        if page_size.is_all() {
            return Err(Error::Config(
                "Limit::All cannot be used for paging; it breaks termination".to_string(),
            ));
        }
        let page_size = match page_size {
            Limit::Count(n) => n.get(),
            Limit::All => unreachable!(),
        };
        let (period, date) = period.to_params();
        Ok(Cursor {
            id_site,
            period: period.to_string(),
            date,
            segment: segment.map(|s| s.0),
            page_size,
            offset: 0,
        })
    }

    fn to_params(&self) -> Params {
        let mut params = Params::new()
            .id_site(IdSite::Single(self.id_site))
            .set("period", self.period.clone())
            .set("date", self.date.clone())
            .set("filter_limit", self.page_size.to_string())
            .offset(self.offset);
        if let Some(s) = &self.segment {
            params = params.set("segment", s.clone());
        }
        params
    }
}

impl<'a> LiveHandle<'a> {
    pub(crate) fn new(client: &'a MatomoClient) -> Self {
        LiveHandle { client }
    }

    /// Fetch one page. An empty page is the authoritative terminator (returns a
    /// `None` next cursor); a short non-empty page is NOT.
    pub async fn next_page(&self, cursor: &Cursor) -> Result<(Vec<Visit>, Option<Cursor>)> {
        let visits: Vec<Visit> = self
            .client
            .query(LiveGetLastVisitsDetails {
                params: cursor.to_params(),
            })
            .await?;

        if visits.is_empty() {
            return Ok((visits, None));
        }
        let next = Cursor {
            offset: cursor.offset + cursor.page_size,
            ..cursor.clone()
        };
        Ok((visits, Some(next)))
    }

    /// Build a `VisitStream` over a paged export. Owns an `Arc`-cloned client so
    /// the stream is `'static + Send` and can be spawned.
    pub fn stream(
        &self,
        id_site: u32,
        period: Period,
        page_size: Limit,
        segment: Option<Segment>,
    ) -> Result<VisitStream> {
        let cursor = Cursor::new(id_site, period, page_size, segment)?;
        Ok(VisitStream::new(self.client.clone(), cursor))
    }
}

struct StreamState {
    client: MatomoClient,
    cursor: Option<Cursor>,
    buffer: std::collections::VecDeque<Visit>,
}

/// A `Stream` of visits backed by the resumable pager. On the first error the
/// stream ends.
pub struct VisitStream(Pin<Box<dyn Stream<Item = Result<Visit>> + Send>>);

impl VisitStream {
    fn new(client: MatomoClient, cursor: Cursor) -> Self {
        let state = StreamState {
            client,
            cursor: Some(cursor),
            buffer: std::collections::VecDeque::new(),
        };
        let stream = futures_util::stream::try_unfold(state, |mut state| async move {
            loop {
                if let Some(visit) = state.buffer.pop_front() {
                    return Ok(Some((visit, state)));
                }
                let Some(cursor) = state.cursor.take() else {
                    return Ok(None);
                };
                let (visits, next) = state.client.live().next_page(&cursor).await?;
                state.cursor = next;
                if visits.is_empty() {
                    return Ok(None);
                }
                state.buffer.extend(visits);
            }
        });
        VisitStream(Box::pin(stream))
    }
}

impl Stream for VisitStream {
    type Item = Result<Visit>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        self.0.as_mut().poll_next(cx)
    }
}