1use std::pin::Pin;
2use std::task::{Context, Poll};
3
4use futures_core::Stream;
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7
8use crate::endpoints::{
9 ActionsGetDownloads, ActionsGetOutlinks, ActionsGetPageTitles, ActionsGetPageUrls,
10 LiveGetLastVisitsDetails, ReferrersGetAll, ReferrersGetReferrerType, VisitsSummaryGet,
11};
12use crate::error::{Error, Result};
13use crate::models::{
14 ActionPage, Download, Outlink, ReferrerAll, ReferrerType, Visit, VisitsSummary,
15};
16use crate::params::{IdSite, Limit, Period, Segment};
17use crate::request::Params;
18use crate::reqwest::MatomoClient;
19
20#[derive(Clone, Copy)]
22pub struct VisitsSummaryHandle<'a> {
23 client: &'a MatomoClient,
24}
25
26impl<'a> VisitsSummaryHandle<'a> {
27 pub(crate) fn new(client: &'a MatomoClient) -> Self {
28 VisitsSummaryHandle { client }
29 }
30
31 pub async fn get(
32 &self,
33 id_site: impl Into<IdSite>,
34 period: Period,
35 segment: Option<Segment>,
36 ) -> Result<VisitsSummary> {
37 self.client
38 .query(VisitsSummaryGet {
39 id_site: id_site.into(),
40 period,
41 segment,
42 })
43 .await
44 }
45}
46
47#[derive(Clone, Copy)]
49pub struct ActionsHandle<'a> {
50 client: &'a MatomoClient,
51}
52
53impl<'a> ActionsHandle<'a> {
54 pub(crate) fn new(client: &'a MatomoClient) -> Self {
55 ActionsHandle { client }
56 }
57
58 pub async fn get_page_urls(
59 &self,
60 id_site: impl Into<IdSite>,
61 period: Period,
62 segment: Option<Segment>,
63 ) -> Result<Vec<ActionPage>> {
64 self.client
65 .query(ActionsGetPageUrls {
66 id_site: id_site.into(),
67 period,
68 segment,
69 })
70 .await
71 }
72
73 pub async fn get_page_titles(
74 &self,
75 id_site: impl Into<IdSite>,
76 period: Period,
77 segment: Option<Segment>,
78 ) -> Result<Vec<ActionPage>> {
79 self.client
80 .query(ActionsGetPageTitles {
81 id_site: id_site.into(),
82 period,
83 segment,
84 })
85 .await
86 }
87
88 pub async fn get_downloads(
89 &self,
90 id_site: impl Into<IdSite>,
91 period: Period,
92 segment: Option<Segment>,
93 ) -> Result<Vec<Download>> {
94 self.client
95 .query(ActionsGetDownloads {
96 id_site: id_site.into(),
97 period,
98 segment,
99 })
100 .await
101 }
102
103 pub async fn get_outlinks(
104 &self,
105 id_site: impl Into<IdSite>,
106 period: Period,
107 segment: Option<Segment>,
108 ) -> Result<Vec<Outlink>> {
109 self.client
110 .query(ActionsGetOutlinks {
111 id_site: id_site.into(),
112 period,
113 segment,
114 })
115 .await
116 }
117}
118
119#[derive(Clone, Copy)]
121pub struct ReferrersHandle<'a> {
122 client: &'a MatomoClient,
123}
124
125impl<'a> ReferrersHandle<'a> {
126 pub(crate) fn new(client: &'a MatomoClient) -> Self {
127 ReferrersHandle { client }
128 }
129
130 pub async fn get_referrer_type(
131 &self,
132 id_site: impl Into<IdSite>,
133 period: Period,
134 segment: Option<Segment>,
135 ) -> Result<Vec<ReferrerType>> {
136 self.client
137 .query(ReferrersGetReferrerType {
138 id_site: id_site.into(),
139 period,
140 segment,
141 })
142 .await
143 }
144
145 pub async fn get_all(
146 &self,
147 id_site: impl Into<IdSite>,
148 period: Period,
149 segment: Option<Segment>,
150 ) -> Result<Vec<ReferrerAll>> {
151 self.client
152 .query(ReferrersGetAll {
153 id_site: id_site.into(),
154 period,
155 segment,
156 })
157 .await
158 }
159}
160
161#[derive(Clone, Copy)]
163pub struct ApiHandle<'a> {
164 client: &'a MatomoClient,
165}
166
167impl<'a> ApiHandle<'a> {
168 pub(crate) fn new(client: &'a MatomoClient) -> Self {
169 ApiHandle { client }
170 }
171
172 pub async fn version(&self) -> Result<String> {
173 let value = self
174 .client
175 .call("API.getMatomoVersion", &Params::new())
176 .await?;
177 Ok(value
178 .get("value")
179 .and_then(Value::as_str)
180 .unwrap_or_default()
181 .to_string())
182 }
183
184 pub async fn report_metadata(&self) -> Result<Value> {
186 self.client
187 .call("API.getReportMetadata", &Params::new())
188 .await
189 }
190
191 pub async fn bulk_request(&self, calls: &[(&str, Params)]) -> Result<Value> {
193 let mut params = Params::new();
194 for (i, (method, p)) in calls.iter().enumerate() {
195 params = params.set(format!("urls[{i}]"), p.to_bulk_query(method));
196 }
197 self.client.call("API.getBulkRequest", ¶ms).await
198 }
199}
200
201#[derive(Clone, Copy)]
203pub struct LiveHandle<'a> {
204 client: &'a MatomoClient,
205}
206
207#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
210pub struct Cursor {
211 pub id_site: u32,
212 pub period: String,
213 pub date: String,
214 pub segment: Option<String>,
215 pub page_size: u32,
216 pub offset: u32,
217}
218
219impl Cursor {
220 pub fn new(
223 id_site: u32,
224 period: Period,
225 page_size: Limit,
226 segment: Option<Segment>,
227 ) -> Result<Self> {
228 if page_size.is_all() {
229 return Err(Error::Config(
230 "Limit::All cannot be used for paging; it breaks termination".to_string(),
231 ));
232 }
233 let page_size = match page_size {
234 Limit::Count(n) => n.get(),
235 Limit::All => unreachable!(),
236 };
237 let (period, date) = period.to_params();
238 Ok(Cursor {
239 id_site,
240 period: period.to_string(),
241 date,
242 segment: segment.map(|s| s.0),
243 page_size,
244 offset: 0,
245 })
246 }
247
248 fn to_params(&self) -> Params {
249 let mut params = Params::new()
250 .id_site(IdSite::Single(self.id_site))
251 .set("period", self.period.clone())
252 .set("date", self.date.clone())
253 .set("filter_limit", self.page_size.to_string())
254 .offset(self.offset);
255 if let Some(s) = &self.segment {
256 params = params.set("segment", s.clone());
257 }
258 params
259 }
260}
261
262impl<'a> LiveHandle<'a> {
263 pub(crate) fn new(client: &'a MatomoClient) -> Self {
264 LiveHandle { client }
265 }
266
267 pub async fn next_page(&self, cursor: &Cursor) -> Result<(Vec<Visit>, Option<Cursor>)> {
270 let visits: Vec<Visit> = self
271 .client
272 .query(LiveGetLastVisitsDetails {
273 params: cursor.to_params(),
274 })
275 .await?;
276
277 if visits.is_empty() {
278 return Ok((visits, None));
279 }
280 let next = Cursor {
281 offset: cursor.offset + cursor.page_size,
282 ..cursor.clone()
283 };
284 Ok((visits, Some(next)))
285 }
286
287 pub fn stream(
290 &self,
291 id_site: u32,
292 period: Period,
293 page_size: Limit,
294 segment: Option<Segment>,
295 ) -> Result<VisitStream> {
296 let cursor = Cursor::new(id_site, period, page_size, segment)?;
297 Ok(VisitStream::new(self.client.clone(), cursor))
298 }
299}
300
301struct StreamState {
302 client: MatomoClient,
303 cursor: Option<Cursor>,
304 buffer: std::collections::VecDeque<Visit>,
305}
306
307pub struct VisitStream(Pin<Box<dyn Stream<Item = Result<Visit>> + Send>>);
310
311impl VisitStream {
312 fn new(client: MatomoClient, cursor: Cursor) -> Self {
313 let state = StreamState {
314 client,
315 cursor: Some(cursor),
316 buffer: std::collections::VecDeque::new(),
317 };
318 let stream = futures_util::stream::try_unfold(state, |mut state| async move {
319 loop {
320 if let Some(visit) = state.buffer.pop_front() {
321 return Ok(Some((visit, state)));
322 }
323 let Some(cursor) = state.cursor.take() else {
324 return Ok(None);
325 };
326 let (visits, next) = state.client.live().next_page(&cursor).await?;
327 state.cursor = next;
328 if visits.is_empty() {
329 return Ok(None);
330 }
331 state.buffer.extend(visits);
332 }
333 });
334 VisitStream(Box::pin(stream))
335 }
336}
337
338impl Stream for VisitStream {
339 type Item = Result<Visit>;
340
341 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
342 self.0.as_mut().poll_next(cx)
343 }
344}