Skip to main content

canvas_lms_api/
pagination.rs

1use crate::{error::Result, http::Requester};
2use serde::de::DeserializeOwned;
3use std::{
4    collections::VecDeque,
5    future::Future,
6    marker::PhantomData,
7    pin::Pin,
8    sync::Arc,
9    task::{Context, Poll},
10};
11use url::Url;
12
13type InjectFn<T> = Arc<dyn Fn(T, Arc<Requester>) -> T + Send + Sync>;
14type PendingFetch<T> = Pin<Box<dyn Future<Output = Result<(VecDeque<T>, Option<Url>)>> + Send>>;
15
16/// An async stream of Canvas API resources, fetched lazily page by page.
17///
18/// Collect all items with [`PageStream::collect_all`], or use
19/// [`futures::StreamExt`] methods (`.next()`, `.map()`, `.filter()`, etc.)
20/// directly — `PageStream` implements [`futures::Stream`] when the `async`
21/// feature is enabled (the default).
22///
23/// ```no_run
24/// # async fn example(stream: canvas_lms_api::PageStream<()>) -> canvas_lms_api::Result<()> {
25/// // Eager collection into a Vec
26/// let items: Vec<()> = stream.collect_all().await?;
27/// # Ok(()) }
28/// ```
29pub struct PageStream<T> {
30    requester: Arc<Requester>,
31    next_url: Option<Url>,
32    params: Vec<(String, String)>,
33    buffer: VecDeque<T>,
34    inject_fn: Option<InjectFn<T>>,
35    _phantom: PhantomData<T>,
36    pending: Option<PendingFetch<T>>,
37}
38
39impl<T: DeserializeOwned> PageStream<T> {
40    pub(crate) fn new(
41        requester: Arc<Requester>,
42        endpoint: &str,
43        mut params: Vec<(String, String)>,
44    ) -> Self {
45        if !params.iter().any(|(k, _)| k == "per_page") {
46            params.push(("per_page".into(), "100".into()));
47        }
48        let next_url = requester.base_url.join(endpoint).ok();
49        Self {
50            requester,
51            next_url,
52            params,
53            buffer: VecDeque::new(),
54            inject_fn: None,
55            _phantom: PhantomData,
56            pending: None,
57        }
58    }
59
60    pub(crate) fn new_with_injector(
61        requester: Arc<Requester>,
62        endpoint: &str,
63        mut params: Vec<(String, String)>,
64        inject: impl Fn(T, Arc<Requester>) -> T + Send + Sync + 'static,
65    ) -> Self {
66        if !params.iter().any(|(k, _)| k == "per_page") {
67            params.push(("per_page".into(), "100".into()));
68        }
69        let next_url = requester.base_url.join(endpoint).ok();
70        Self {
71            requester,
72            next_url,
73            params,
74            buffer: VecDeque::new(),
75            inject_fn: Some(Arc::new(inject)),
76            _phantom: PhantomData,
77            pending: None,
78        }
79    }
80
81    #[cfg(feature = "new-quizzes")]
82    pub(crate) fn new_with_injector_nq(
83        requester: Arc<Requester>,
84        endpoint: &str,
85        mut params: Vec<(String, String)>,
86        inject: impl Fn(T, Arc<Requester>) -> T + Send + Sync + 'static,
87    ) -> Self {
88        if !params.iter().any(|(k, _)| k == "per_page") {
89            params.push(("per_page".into(), "100".into()));
90        }
91        let next_url = requester.new_quizzes_url.join(endpoint).ok();
92        Self {
93            requester,
94            next_url,
95            params,
96            buffer: VecDeque::new(),
97            inject_fn: Some(Arc::new(inject)),
98            _phantom: PhantomData,
99            pending: None,
100        }
101    }
102
103    /// Collect all items across all pages into a Vec.
104    pub async fn collect_all(mut self) -> Result<Vec<T>> {
105        let mut out = Vec::new();
106        loop {
107            while let Some(item) = self.buffer.pop_front() {
108                out.push(item);
109            }
110            match self.next_url.take() {
111                None => break,
112                Some(url) => {
113                    let (items, next_url) = fetch_page(
114                        Arc::clone(&self.requester),
115                        url,
116                        self.params.clone(),
117                        self.inject_fn.clone(),
118                    )
119                    .await?;
120                    self.buffer = items;
121                    self.next_url = next_url;
122                }
123            }
124        }
125        while let Some(item) = self.buffer.pop_front() {
126            out.push(item);
127        }
128        Ok(out)
129    }
130}
131
132/// Fetch one page from `url`, apply the injector, return buffered items + next URL.
133async fn fetch_page<T: DeserializeOwned>(
134    requester: Arc<Requester>,
135    url: Url,
136    params: Vec<(String, String)>,
137    inject_fn: Option<InjectFn<T>>,
138) -> Result<(VecDeque<T>, Option<Url>)> {
139    let resp = requester.get_raw(url, &params).await?;
140
141    let next_url = parse_link_next(resp.headers());
142
143    let body: serde_json::Value = resp.json().await?;
144
145    // Support meta.pagination.next fallback.
146    let next_url = if next_url.is_none() {
147        body.get("meta")
148            .and_then(|m| m.get("pagination"))
149            .and_then(|p| p.get("next"))
150            .and_then(|n| n.as_str())
151            .and_then(|s| Url::parse(s).ok())
152    } else {
153        next_url
154    };
155
156    let items = match body {
157        serde_json::Value::Array(arr) => arr,
158        serde_json::Value::Object(ref obj) => obj
159            .values()
160            .find_map(|v| v.as_array().cloned())
161            .unwrap_or_default(),
162        _ => vec![],
163    };
164
165    let mut buffer = VecDeque::new();
166    for item in items {
167        let resource: T = serde_json::from_value(item)?;
168        let resource = if let Some(f) = &inject_fn {
169            f(resource, Arc::clone(&requester))
170        } else {
171            resource
172        };
173        buffer.push_back(resource);
174    }
175
176    Ok((buffer, next_url))
177}
178
179// The future in `pending` lives on the heap (Box), so moving PageStream<T>
180// is always safe regardless of T.
181impl<T> Unpin for PageStream<T> {}
182
183#[cfg(feature = "async")]
184impl<T: DeserializeOwned + Send + 'static> futures::Stream for PageStream<T> {
185    type Item = Result<T>;
186
187    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
188        let me = self.get_mut();
189        loop {
190            // Drain buffer before fetching more.
191            if let Some(item) = me.buffer.pop_front() {
192                return Poll::Ready(Some(Ok(item)));
193            }
194            // Poll in-flight page fetch.
195            if let Some(fut) = me.pending.as_mut() {
196                match fut.as_mut().poll(cx) {
197                    Poll::Ready(Ok((items, next_url))) => {
198                        me.pending = None;
199                        me.buffer = items;
200                        me.next_url = next_url;
201                        continue;
202                    }
203                    Poll::Ready(Err(e)) => {
204                        me.pending = None;
205                        return Poll::Ready(Some(Err(e)));
206                    }
207                    Poll::Pending => return Poll::Pending,
208                }
209            }
210            // Kick off a new page fetch if more pages remain.
211            if let Some(url) = me.next_url.take() {
212                let req = Arc::clone(&me.requester);
213                let params = me.params.clone();
214                let inject = me.inject_fn.clone();
215                me.pending = Some(Box::pin(fetch_page(req, url, params, inject)));
216                continue;
217            }
218            return Poll::Ready(None);
219        }
220    }
221}
222
223fn parse_link_next(headers: &reqwest::header::HeaderMap) -> Option<Url> {
224    let link = headers.get("Link")?.to_str().ok()?;
225    for part in link.split(',') {
226        let mut url_part = None;
227        let mut rel = None;
228        for segment in part.split(';') {
229            let s = segment.trim();
230            if s.starts_with('<') && s.ends_with('>') {
231                url_part = Some(&s[1..s.len() - 1]);
232            } else if s.starts_with("rel=") {
233                rel = Some(s.trim_start_matches("rel=").trim_matches('"'));
234            }
235        }
236        if rel == Some("next") {
237            if let Some(u) = url_part {
238                return Url::parse(u).ok();
239            }
240        }
241    }
242    None
243}