Skip to main content

matomo/reqwest/
handles.rs

1use std::pin::Pin;
2use std::task::{Context, Poll};
3
4use futures_core::Stream;
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7
8use crate::endpoints::{
9    ActionsGetDownloads, ActionsGetOutlinks, ActionsGetPageTitles, ActionsGetPageUrls,
10    LiveGetLastVisitsDetails, ReferrersGetAll, ReferrersGetReferrerType, VisitsSummaryGet,
11};
12use crate::error::{Error, Result};
13use crate::models::{
14    ActionPage, Download, Outlink, ReferrerAll, ReferrerType, Visit, VisitsSummary,
15};
16use crate::params::{IdSite, Limit, Period, Segment};
17use crate::request::Params;
18use crate::reqwest::MatomoClient;
19
20/// Handle for the `VisitsSummary` module.
21#[derive(Clone, Copy)]
22pub struct VisitsSummaryHandle<'a> {
23    client: &'a MatomoClient,
24}
25
26impl<'a> VisitsSummaryHandle<'a> {
27    pub(crate) fn new(client: &'a MatomoClient) -> Self {
28        VisitsSummaryHandle { client }
29    }
30
31    pub async fn get(
32        &self,
33        id_site: impl Into<IdSite>,
34        period: Period,
35        segment: Option<Segment>,
36    ) -> Result<VisitsSummary> {
37        self.client
38            .query(VisitsSummaryGet {
39                id_site: id_site.into(),
40                period,
41                segment,
42            })
43            .await
44    }
45}
46
47/// Handle for the `Actions` module.
48#[derive(Clone, Copy)]
49pub struct ActionsHandle<'a> {
50    client: &'a MatomoClient,
51}
52
53impl<'a> ActionsHandle<'a> {
54    pub(crate) fn new(client: &'a MatomoClient) -> Self {
55        ActionsHandle { client }
56    }
57
58    pub async fn get_page_urls(
59        &self,
60        id_site: impl Into<IdSite>,
61        period: Period,
62        segment: Option<Segment>,
63    ) -> Result<Vec<ActionPage>> {
64        self.client
65            .query(ActionsGetPageUrls {
66                id_site: id_site.into(),
67                period,
68                segment,
69            })
70            .await
71    }
72
73    pub async fn get_page_titles(
74        &self,
75        id_site: impl Into<IdSite>,
76        period: Period,
77        segment: Option<Segment>,
78    ) -> Result<Vec<ActionPage>> {
79        self.client
80            .query(ActionsGetPageTitles {
81                id_site: id_site.into(),
82                period,
83                segment,
84            })
85            .await
86    }
87
88    pub async fn get_downloads(
89        &self,
90        id_site: impl Into<IdSite>,
91        period: Period,
92        segment: Option<Segment>,
93    ) -> Result<Vec<Download>> {
94        self.client
95            .query(ActionsGetDownloads {
96                id_site: id_site.into(),
97                period,
98                segment,
99            })
100            .await
101    }
102
103    pub async fn get_outlinks(
104        &self,
105        id_site: impl Into<IdSite>,
106        period: Period,
107        segment: Option<Segment>,
108    ) -> Result<Vec<Outlink>> {
109        self.client
110            .query(ActionsGetOutlinks {
111                id_site: id_site.into(),
112                period,
113                segment,
114            })
115            .await
116    }
117}
118
119/// Handle for the `Referrers` module.
120#[derive(Clone, Copy)]
121pub struct ReferrersHandle<'a> {
122    client: &'a MatomoClient,
123}
124
125impl<'a> ReferrersHandle<'a> {
126    pub(crate) fn new(client: &'a MatomoClient) -> Self {
127        ReferrersHandle { client }
128    }
129
130    pub async fn get_referrer_type(
131        &self,
132        id_site: impl Into<IdSite>,
133        period: Period,
134        segment: Option<Segment>,
135    ) -> Result<Vec<ReferrerType>> {
136        self.client
137            .query(ReferrersGetReferrerType {
138                id_site: id_site.into(),
139                period,
140                segment,
141            })
142            .await
143    }
144
145    pub async fn get_all(
146        &self,
147        id_site: impl Into<IdSite>,
148        period: Period,
149        segment: Option<Segment>,
150    ) -> Result<Vec<ReferrerAll>> {
151        self.client
152            .query(ReferrersGetAll {
153                id_site: id_site.into(),
154                period,
155                segment,
156            })
157            .await
158    }
159}
160
161/// Handle for the `API` module.
162#[derive(Clone, Copy)]
163pub struct ApiHandle<'a> {
164    client: &'a MatomoClient,
165}
166
167impl<'a> ApiHandle<'a> {
168    pub(crate) fn new(client: &'a MatomoClient) -> Self {
169        ApiHandle { client }
170    }
171
172    pub async fn version(&self) -> Result<String> {
173        let value = self
174            .client
175            .call("API.getMatomoVersion", &Params::new())
176            .await?;
177        Ok(value
178            .get("value")
179            .and_then(Value::as_str)
180            .unwrap_or_default()
181            .to_string())
182    }
183
184    /// Raw report metadata as a JSON value.
185    pub async fn report_metadata(&self) -> Result<Value> {
186        self.client
187            .call("API.getReportMetadata", &Params::new())
188            .await
189    }
190
191    /// Compose multiple calls into a single `API.getBulkRequest`.
192    pub async fn bulk_request(&self, calls: &[(&str, Params)]) -> Result<Value> {
193        let mut params = Params::new();
194        for (i, (method, p)) in calls.iter().enumerate() {
195            params = params.set(format!("urls[{i}]"), p.to_bulk_query(method));
196        }
197        self.client.call("API.getBulkRequest", &params).await
198    }
199}
200
201/// Handle for the `Live` module.
202#[derive(Clone, Copy)]
203pub struct LiveHandle<'a> {
204    client: &'a MatomoClient,
205}
206
207/// Serializable paging context for resuming a `Live.getLastVisitsDetails`
208/// export across restarts.
209#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
210pub struct Cursor {
211    pub id_site: u32,
212    pub period: String,
213    pub date: String,
214    pub segment: Option<String>,
215    pub page_size: u32,
216    pub offset: u32,
217}
218
219impl Cursor {
220    /// Build the initial cursor for a paged export. Rejects `Limit::All`, which
221    /// has no termination guarantee in the paging path.
222    pub fn new(
223        id_site: u32,
224        period: Period,
225        page_size: Limit,
226        segment: Option<Segment>,
227    ) -> Result<Self> {
228        if page_size.is_all() {
229            return Err(Error::Config(
230                "Limit::All cannot be used for paging; it breaks termination".to_string(),
231            ));
232        }
233        let page_size = match page_size {
234            Limit::Count(n) => n.get(),
235            Limit::All => unreachable!(),
236        };
237        let (period, date) = period.to_params();
238        Ok(Cursor {
239            id_site,
240            period: period.to_string(),
241            date,
242            segment: segment.map(|s| s.0),
243            page_size,
244            offset: 0,
245        })
246    }
247
248    fn to_params(&self) -> Params {
249        let mut params = Params::new()
250            .id_site(IdSite::Single(self.id_site))
251            .set("period", self.period.clone())
252            .set("date", self.date.clone())
253            .set("filter_limit", self.page_size.to_string())
254            .offset(self.offset);
255        if let Some(s) = &self.segment {
256            params = params.set("segment", s.clone());
257        }
258        params
259    }
260}
261
262impl<'a> LiveHandle<'a> {
263    pub(crate) fn new(client: &'a MatomoClient) -> Self {
264        LiveHandle { client }
265    }
266
267    /// Fetch one page. An empty page is the authoritative terminator (returns a
268    /// `None` next cursor); a short non-empty page is NOT.
269    pub async fn next_page(&self, cursor: &Cursor) -> Result<(Vec<Visit>, Option<Cursor>)> {
270        let visits: Vec<Visit> = self
271            .client
272            .query(LiveGetLastVisitsDetails {
273                params: cursor.to_params(),
274            })
275            .await?;
276
277        if visits.is_empty() {
278            return Ok((visits, None));
279        }
280        let next = Cursor {
281            offset: cursor.offset + cursor.page_size,
282            ..cursor.clone()
283        };
284        Ok((visits, Some(next)))
285    }
286
287    /// Build a `VisitStream` over a paged export. Owns an `Arc`-cloned client so
288    /// the stream is `'static + Send` and can be spawned.
289    pub fn stream(
290        &self,
291        id_site: u32,
292        period: Period,
293        page_size: Limit,
294        segment: Option<Segment>,
295    ) -> Result<VisitStream> {
296        let cursor = Cursor::new(id_site, period, page_size, segment)?;
297        Ok(VisitStream::new(self.client.clone(), cursor))
298    }
299}
300
301struct StreamState {
302    client: MatomoClient,
303    cursor: Option<Cursor>,
304    buffer: std::collections::VecDeque<Visit>,
305}
306
307/// A `Stream` of visits backed by the resumable pager. On the first error the
308/// stream ends.
309pub struct VisitStream(Pin<Box<dyn Stream<Item = Result<Visit>> + Send>>);
310
311impl VisitStream {
312    fn new(client: MatomoClient, cursor: Cursor) -> Self {
313        let state = StreamState {
314            client,
315            cursor: Some(cursor),
316            buffer: std::collections::VecDeque::new(),
317        };
318        let stream = futures_util::stream::try_unfold(state, |mut state| async move {
319            loop {
320                if let Some(visit) = state.buffer.pop_front() {
321                    return Ok(Some((visit, state)));
322                }
323                let Some(cursor) = state.cursor.take() else {
324                    return Ok(None);
325                };
326                let (visits, next) = state.client.live().next_page(&cursor).await?;
327                state.cursor = next;
328                if visits.is_empty() {
329                    return Ok(None);
330                }
331                state.buffer.extend(visits);
332            }
333        });
334        VisitStream(Box::pin(stream))
335    }
336}
337
338impl Stream for VisitStream {
339    type Item = Result<Visit>;
340
341    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
342        self.0.as_mut().poll_next(cx)
343    }
344}