use std::pin::Pin;
use std::task::{Context, Poll};
use futures_core::Stream;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use crate::endpoints::{
ActionsGetDownloads, ActionsGetOutlinks, ActionsGetPageTitles, ActionsGetPageUrls,
LiveGetLastVisitsDetails, ReferrersGetAll, ReferrersGetReferrerType, VisitsSummaryGet,
};
use crate::error::{Error, Result};
use crate::models::{
ActionPage, Download, Outlink, ReferrerAll, ReferrerType, Visit, VisitsSummary,
};
use crate::params::{IdSite, Limit, Period, Segment};
use crate::request::Params;
use crate::reqwest::MatomoClient;
#[derive(Clone, Copy)]
pub struct VisitsSummaryHandle<'a> {
client: &'a MatomoClient,
}
impl<'a> VisitsSummaryHandle<'a> {
pub(crate) fn new(client: &'a MatomoClient) -> Self {
VisitsSummaryHandle { client }
}
pub async fn get(
&self,
id_site: impl Into<IdSite>,
period: Period,
segment: Option<Segment>,
) -> Result<VisitsSummary> {
self.client
.query(VisitsSummaryGet {
id_site: id_site.into(),
period,
segment,
})
.await
}
}
#[derive(Clone, Copy)]
pub struct ActionsHandle<'a> {
client: &'a MatomoClient,
}
impl<'a> ActionsHandle<'a> {
pub(crate) fn new(client: &'a MatomoClient) -> Self {
ActionsHandle { client }
}
pub async fn get_page_urls(
&self,
id_site: impl Into<IdSite>,
period: Period,
segment: Option<Segment>,
) -> Result<Vec<ActionPage>> {
self.client
.query(ActionsGetPageUrls {
id_site: id_site.into(),
period,
segment,
})
.await
}
pub async fn get_page_titles(
&self,
id_site: impl Into<IdSite>,
period: Period,
segment: Option<Segment>,
) -> Result<Vec<ActionPage>> {
self.client
.query(ActionsGetPageTitles {
id_site: id_site.into(),
period,
segment,
})
.await
}
pub async fn get_downloads(
&self,
id_site: impl Into<IdSite>,
period: Period,
segment: Option<Segment>,
) -> Result<Vec<Download>> {
self.client
.query(ActionsGetDownloads {
id_site: id_site.into(),
period,
segment,
})
.await
}
pub async fn get_outlinks(
&self,
id_site: impl Into<IdSite>,
period: Period,
segment: Option<Segment>,
) -> Result<Vec<Outlink>> {
self.client
.query(ActionsGetOutlinks {
id_site: id_site.into(),
period,
segment,
})
.await
}
}
#[derive(Clone, Copy)]
pub struct ReferrersHandle<'a> {
client: &'a MatomoClient,
}
impl<'a> ReferrersHandle<'a> {
pub(crate) fn new(client: &'a MatomoClient) -> Self {
ReferrersHandle { client }
}
pub async fn get_referrer_type(
&self,
id_site: impl Into<IdSite>,
period: Period,
segment: Option<Segment>,
) -> Result<Vec<ReferrerType>> {
self.client
.query(ReferrersGetReferrerType {
id_site: id_site.into(),
period,
segment,
})
.await
}
pub async fn get_all(
&self,
id_site: impl Into<IdSite>,
period: Period,
segment: Option<Segment>,
) -> Result<Vec<ReferrerAll>> {
self.client
.query(ReferrersGetAll {
id_site: id_site.into(),
period,
segment,
})
.await
}
}
#[derive(Clone, Copy)]
pub struct ApiHandle<'a> {
client: &'a MatomoClient,
}
impl<'a> ApiHandle<'a> {
pub(crate) fn new(client: &'a MatomoClient) -> Self {
ApiHandle { client }
}
pub async fn version(&self) -> Result<String> {
let value = self
.client
.call("API.getMatomoVersion", &Params::new())
.await?;
Ok(value
.get("value")
.and_then(Value::as_str)
.unwrap_or_default()
.to_string())
}
pub async fn report_metadata(&self) -> Result<Value> {
self.client
.call("API.getReportMetadata", &Params::new())
.await
}
pub async fn bulk_request(&self, calls: &[(&str, Params)]) -> Result<Value> {
let mut params = Params::new();
for (i, (method, p)) in calls.iter().enumerate() {
params = params.set(format!("urls[{i}]"), p.to_bulk_query(method));
}
self.client.call("API.getBulkRequest", ¶ms).await
}
}
#[derive(Clone, Copy)]
pub struct LiveHandle<'a> {
client: &'a MatomoClient,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct Cursor {
pub id_site: u32,
pub period: String,
pub date: String,
pub segment: Option<String>,
pub page_size: u32,
pub offset: u32,
}
impl Cursor {
pub fn new(
id_site: u32,
period: Period,
page_size: Limit,
segment: Option<Segment>,
) -> Result<Self> {
if page_size.is_all() {
return Err(Error::Config(
"Limit::All cannot be used for paging; it breaks termination".to_string(),
));
}
let page_size = match page_size {
Limit::Count(n) => n.get(),
Limit::All => unreachable!(),
};
let (period, date) = period.to_params();
Ok(Cursor {
id_site,
period: period.to_string(),
date,
segment: segment.map(|s| s.0),
page_size,
offset: 0,
})
}
fn to_params(&self) -> Params {
let mut params = Params::new()
.id_site(IdSite::Single(self.id_site))
.set("period", self.period.clone())
.set("date", self.date.clone())
.set("filter_limit", self.page_size.to_string())
.offset(self.offset);
if let Some(s) = &self.segment {
params = params.set("segment", s.clone());
}
params
}
}
impl<'a> LiveHandle<'a> {
pub(crate) fn new(client: &'a MatomoClient) -> Self {
LiveHandle { client }
}
pub async fn next_page(&self, cursor: &Cursor) -> Result<(Vec<Visit>, Option<Cursor>)> {
let visits: Vec<Visit> = self
.client
.query(LiveGetLastVisitsDetails {
params: cursor.to_params(),
})
.await?;
if visits.is_empty() {
return Ok((visits, None));
}
let next = Cursor {
offset: cursor.offset + cursor.page_size,
..cursor.clone()
};
Ok((visits, Some(next)))
}
pub fn stream(
&self,
id_site: u32,
period: Period,
page_size: Limit,
segment: Option<Segment>,
) -> Result<VisitStream> {
let cursor = Cursor::new(id_site, period, page_size, segment)?;
Ok(VisitStream::new(self.client.clone(), cursor))
}
}
struct StreamState {
client: MatomoClient,
cursor: Option<Cursor>,
buffer: std::collections::VecDeque<Visit>,
}
pub struct VisitStream(Pin<Box<dyn Stream<Item = Result<Visit>> + Send>>);
impl VisitStream {
fn new(client: MatomoClient, cursor: Cursor) -> Self {
let state = StreamState {
client,
cursor: Some(cursor),
buffer: std::collections::VecDeque::new(),
};
let stream = futures_util::stream::try_unfold(state, |mut state| async move {
loop {
if let Some(visit) = state.buffer.pop_front() {
return Ok(Some((visit, state)));
}
let Some(cursor) = state.cursor.take() else {
return Ok(None);
};
let (visits, next) = state.client.live().next_page(&cursor).await?;
state.cursor = next;
if visits.is_empty() {
return Ok(None);
}
state.buffer.extend(visits);
}
});
VisitStream(Box::pin(stream))
}
}
impl Stream for VisitStream {
type Item = Result<Visit>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.0.as_mut().poll_next(cx)
}
}