sms_client/http/
paginator.rs

1//! HTTP request paginator, supporting lazy traversal across large sets
2
3use crate::http::types::HttpPaginationOptions;
4use crate::http::error::*;
5
6/// Call a function with an update HttpPaginationOptions for each batch request,
7/// simplifying lazy access to large response sets such as messages etc.
8pub struct HttpPaginator<T, F, Fut> {
9    http_fn: F,
10    pagination: HttpPaginationOptions,
11    current_batch: Vec<T>,
12    current_index: usize,
13    has_more: bool,
14    initial_limit: u64,
15    _phantom: std::marker::PhantomData<Fut>
16}
17impl<T, F, Fut> HttpPaginator<T, F, Fut>
18where
19    F: Fn(Option<HttpPaginationOptions>) -> Fut,
20    Fut: Future<Output = HttpResult<Vec<T>>>
21{
22
23    /// Create the paginator with the http batch generator.
24    ///
25    /// # Example
26    /// ```rust
27    /// use sms_client::Client;
28    /// use sms_client::config::ClientConfig;
29    /// use sms_client::http::paginator::HttpPaginator;
30    /// use sms_client::http::types::HttpPaginationOptions;
31    ///
32    /// let http = Client::new(ClientConfig::http_only("http://localhost:3000").with_auth("token!"))?.http_arc();
33    /// let mut paginator = HttpPaginator::new(
34    ///     move |pagination| {
35    ///         let http = http.clone();
36    ///         async move {
37    ///             http.get_latest_numbers(pagination).await
38    ///         }
39    ///     },
40    ///     HttpPaginationOptions::default()
41    ///         .with_limit(10) // Do it in batches of 10.
42    ///         .with_offset(10) // Skip the first 10 results.
43    ///         .with_reverse(true) // Reverse the results set.
44    /// );
45    /// ```
46    pub fn new(http_fn: F, pagination: HttpPaginationOptions) -> Self {
47        let initial_limit = pagination.limit.unwrap_or(50);
48
49        Self {
50            http_fn,
51            pagination,
52            current_batch: Vec::new(),
53            current_index: 0,
54            has_more: true,
55            initial_limit,
56            _phantom: std::marker::PhantomData
57        }
58    }
59
60    /// Create a paginator with default pagination settings.
61    /// This starts at offset 0 with a limit of 50 per page.
62    ///
63    /// # Example
64    /// ```rust
65    /// use sms_client::http;
66    /// use sms_client::Client;
67    /// use sms_client::config::ClientConfig;
68    /// use sms_client::http::HttpClient;
69    /// use sms_client::http::paginator::HttpPaginator;
70    ///
71    /// /// View all latest numbers, in a default paginator with a limit of 50 per chunk.
72    /// async fn view_all_latest_numbers(http: HttpClient) {
73    ///     let mut paginator = HttpPaginator::with_defaults(|pagination| {
74    ///         http.get_latest_numbers(pagination)
75    ///     });
76    ///     while let Some(message) = paginator.next().await {
77    ///         log::info!("{:?}", message);
78    ///     }
79    /// }
80    /// ```
81    pub fn with_defaults(http_fn: F) -> Self {
82        Self::new(
83            http_fn,
84            HttpPaginationOptions::default()
85                .with_limit(50)
86                .with_offset(0)
87        )
88    }
89
90    /// Fetch the next batch of items from the API.
91    async fn fetch_next_batch(&mut self) -> HttpResult<bool> {
92        log::trace!("Fetching next batch: {:?}", self.pagination);
93        let response = (self.http_fn)(Some(self.pagination.clone())).await?;
94
95        let received_count = response.len() as u64;
96        self.has_more = received_count >= self.initial_limit;
97
98        // If no more items have been received, we're definitely done.
99        if received_count == 0 {
100            self.has_more = false;
101            return Ok(false);
102        }
103
104        self.current_batch = response;
105        self.current_index = 0;
106
107        // Update offset for next request.
108        if let Some(current_offset) = self.pagination.offset {
109            self.pagination.offset = Some(current_offset + received_count);
110        } else {
111
112            // If no offset was set initially, start from the received count
113            self.pagination.offset = Some(received_count);
114        }
115
116        Ok(true)
117    }
118
119    /// Get the next item, automatically fetching next pages as needed.
120    ///
121    /// # Example
122    /// ```rust
123    /// use sms_client::http::HttpClient;
124    /// use sms_client::http::paginator::HttpPaginator;
125    ///
126    /// async fn get_delivery_reports(message_id: i64, http: HttpClient) {
127    ///     let mut paginator = HttpPaginator::with_defaults(|pagination| {
128    ///         http.get_delivery_reports(message_id, pagination)
129    ///     }).await;
130    ///
131    ///     /// Iterate through ALL messages, with a page size of 50 (default).
132    ///     while let Some(message) = paginator.next().await {
133    ///         log::info!("{:?}", message);
134    ///     }
135    /// }
136    /// ```
137    pub async fn next(&mut self) -> Option<T> {
138        if self.current_index >= self.current_batch.len() {
139
140            // If there aren't any-more, then there is nothing to fetch next.
141            if !self.has_more {
142                return None;
143            }
144
145            match self.fetch_next_batch().await {
146                Ok(true) => {}, // Successfully fetched more data
147                Ok(false) | Err(_) => return None // No more data or error
148            }
149        }
150
151        // Return the next item if available.
152        if self.current_index < self.current_batch.len() {
153            let item = self.current_batch.remove(0);
154            Some(item)
155        } else {
156            None
157        }
158    }
159
160    /// Collect all remaining items into a Vec.
161    /// This continues to request batches until empty.
162    pub async fn collect_all(mut self) -> HttpResult<Vec<T>> {
163        let mut all_items = Vec::new();
164
165        if self.current_batch.is_empty() && self.has_more {
166            self.fetch_next_batch().await?;
167        }
168
169        while let Some(item) = self.next().await {
170            all_items.push(item);
171        }
172
173        Ok(all_items)
174    }
175
176    /// Process items in chunks, calling the provided closure for each chunk.
177    pub async fn take(mut self, n: usize) -> HttpResult<Vec<T>> {
178        let mut items = Vec::with_capacity(n.min(100)); // Cap initial capacity
179
180        for _ in 0..n {
181            if let Some(item) = self.next().await {
182                items.push(item);
183            } else {
184                break;
185            }
186        }
187
188        Ok(items)
189    }
190
191    /// Process items in chunks, calling the provided closure for each chunk.
192    ///
193    /// # Example
194    /// ```rust
195    /// use std::sync::Arc;
196    /// use sms_client::http::HttpClient;
197    /// use sms_client::http::paginator::HttpPaginator;
198    /// use sms_client::http::types::HttpPaginationOptions;
199    ///
200    /// /// Read all messages from a phone number, in chunks of 10.
201    /// async fn read_all_messages(phone_number: &str, http: Arc<HttpClient>) {
202    ///     let paginator = HttpPaginator::with_defaults(|pagination| {
203    ///         http.get_messages(phone_number, pagination)
204    ///     }).await;
205    ///
206    ///     paginator.for_each_chuck(10, |batch| {
207    ///         for message in batch {
208    ///             log::info!("{:?}", message);
209    ///         }
210    ///     }).await?;
211    /// }
212    /// ```
213    pub async fn for_each_chuck<C>(mut self, chunk_size: usize, mut chunk_fn: C) -> HttpResult<()>
214    where
215        C: FnMut(&[T]) -> HttpResult<()>
216    {
217        let mut chunk = Vec::with_capacity(chunk_size);
218
219        while let Some(item) = self.next().await {
220            chunk.push(item);
221
222            if chunk.len() >= chunk_size {
223                chunk_fn(&chunk)?;
224                chunk.clear();
225            }
226        }
227
228        // Process any remaining items in the final chunk.
229        if !chunk.is_empty() {
230            chunk_fn(&chunk)?;
231        }
232
233        Ok(())
234    }
235
236    /// Skip `n` items and return the paginator.
237    pub async fn skip(mut self, n: usize) -> Self {
238        for _ in 0..n {
239            if self.next().await.is_none() {
240                break;
241            }
242        }
243        self
244    }
245
246    /// Get the current pagination options state.
247    pub fn current_pagination(&self) -> &HttpPaginationOptions {
248        &self.pagination
249    }
250
251    /// Check if there are potentially more items to fetch.
252    pub fn has_more(&self) -> bool {
253        self.has_more || self.current_index < self.current_batch.len()
254    }
255}