sms_client/http/paginator.rs
1//! HTTP request paginator, supporting lazy traversal across large sets
2
3use crate::http::error::HttpResult;
4use crate::http::types::HttpPaginationOptions;
5
6/// Call a function with an update `HttpPaginationOptions` for each batch request,
7/// simplifying lazy access to large response sets such as messages etc.
8pub struct HttpPaginator<T, F, Fut> {
9 http_fn: F,
10 pagination: HttpPaginationOptions,
11 current_batch: Vec<T>,
12 current_index: usize,
13 has_more: bool,
14 initial_limit: u64,
15 _phantom: std::marker::PhantomData<Fut>,
16}
17impl<T, F, Fut> HttpPaginator<T, F, Fut>
18where
19 F: Fn(Option<HttpPaginationOptions>) -> Fut,
20 Fut: Future<Output = HttpResult<Vec<T>>>,
21{
22 /// Create the paginator with the http batch generator.
23 ///
24 /// # Example
25 /// ```rust
26 /// use sms_client::Client;
27 /// use sms_client::config::ClientConfig;
28 /// use sms_client::http::paginator::HttpPaginator;
29 /// use sms_client::http::types::HttpPaginationOptions;
30 ///
31 /// let http = Client::new(ClientConfig::http_only("http://localhost:3000").with_auth("token!"))?.http_arc();
32 /// let mut paginator = HttpPaginator::new(
33 /// move |pagination| {
34 /// let http = http.clone();
35 /// async move {
36 /// http.get_latest_numbers(pagination).await
37 /// }
38 /// },
39 /// HttpPaginationOptions::default()
40 /// .with_limit(10) // Do it in batches of 10.
41 /// .with_offset(10) // Skip the first 10 results.
42 /// .with_reverse(true) // Reverse the results set.
43 /// );
44 /// ```
45 pub fn new(http_fn: F, pagination: HttpPaginationOptions) -> Self {
46 let initial_limit = pagination.limit.unwrap_or(50);
47
48 Self {
49 http_fn,
50 pagination,
51 current_batch: Vec::new(),
52 current_index: 0,
53 has_more: true,
54 initial_limit,
55 _phantom: std::marker::PhantomData,
56 }
57 }
58
59 /// Create a paginator with default pagination settings.
60 /// This starts at offset 0 with a limit of 50 per page.
61 ///
62 /// # Example
63 /// ```rust
64 /// use sms_client::http;
65 /// use sms_client::Client;
66 /// use sms_client::config::ClientConfig;
67 /// use sms_client::http::HttpClient;
68 /// use sms_client::http::paginator::HttpPaginator;
69 ///
70 /// /// View all latest numbers, in a default paginator with a limit of 50 per chunk.
71 /// async fn view_all_latest_numbers(http: HttpClient) {
72 /// let mut paginator = HttpPaginator::with_defaults(|pagination| {
73 /// http.get_latest_numbers(pagination)
74 /// });
75 /// while let Some(message) = paginator.next().await {
76 /// log::info!("{:?}", message);
77 /// }
78 /// }
79 /// ```
80 pub fn with_defaults(http_fn: F) -> Self {
81 Self::new(
82 http_fn,
83 HttpPaginationOptions::default()
84 .with_limit(50)
85 .with_offset(0),
86 )
87 }
88
89 /// Fetch the next batch of items from the API.
90 async fn fetch_next_batch(&mut self) -> HttpResult<bool> {
91 log::trace!("Fetching next batch: {:?}", self.pagination);
92 let response = (self.http_fn)(Some(self.pagination)).await?;
93
94 let received_count = response.len() as u64;
95 self.has_more = received_count >= self.initial_limit;
96
97 // If no more items have been received, we're definitely done.
98 if received_count == 0 {
99 self.has_more = false;
100 return Ok(false);
101 }
102
103 self.current_batch = response;
104 self.current_index = 0;
105
106 // Update offset for next request.
107 if let Some(current_offset) = self.pagination.offset {
108 self.pagination.offset = Some(current_offset + received_count);
109 } else {
110 // If no offset was set initially, start from the received count
111 self.pagination.offset = Some(received_count);
112 }
113
114 Ok(true)
115 }
116
117 /// Get the next item, automatically fetching next pages as needed.
118 ///
119 /// # Example
120 /// ```rust
121 /// use sms_client::http::HttpClient;
122 /// use sms_client::http::paginator::HttpPaginator;
123 ///
124 /// async fn get_delivery_reports(message_id: i64, http: HttpClient) {
125 /// let mut paginator = HttpPaginator::with_defaults(|pagination| {
126 /// http.get_delivery_reports(message_id, pagination)
127 /// }).await;
128 ///
129 /// /// Iterate through ALL messages, with a page size of 50 (default).
130 /// while let Some(message) = paginator.next().await {
131 /// log::info!("{:?}", message);
132 /// }
133 /// }
134 /// ```
135 pub async fn next(&mut self) -> Option<T> {
136 if self.current_index >= self.current_batch.len() {
137 // If there aren't any-more, then there is nothing to fetch next.
138 if !self.has_more {
139 return None;
140 }
141
142 match self.fetch_next_batch().await {
143 Ok(true) => {} // Successfully fetched more data
144 Ok(false) | Err(_) => return None, // No more data or error
145 }
146 }
147
148 // Return the next item if available.
149 if self.current_index < self.current_batch.len() {
150 let item = self.current_batch.remove(0);
151 Some(item)
152 } else {
153 None
154 }
155 }
156
157 /// Collect all remaining items into a Vec.
158 /// This continues to request batches until empty.
159 pub async fn collect_all(mut self) -> HttpResult<Vec<T>> {
160 let mut all_items = Vec::new();
161
162 if self.current_batch.is_empty() && self.has_more {
163 self.fetch_next_batch().await?;
164 }
165
166 while let Some(item) = self.next().await {
167 all_items.push(item);
168 }
169
170 Ok(all_items)
171 }
172
173 /// Process items in chunks, calling the provided closure for each chunk.
174 pub async fn take(mut self, n: usize) -> HttpResult<Vec<T>> {
175 let mut items = Vec::with_capacity(n.min(100)); // Cap initial capacity
176
177 for _ in 0..n {
178 if let Some(item) = self.next().await {
179 items.push(item);
180 } else {
181 break;
182 }
183 }
184
185 Ok(items)
186 }
187
188 /// Process items in chunks, calling the provided closure for each chunk.
189 ///
190 /// # Example
191 /// ```rust
192 /// use std::sync::Arc;
193 /// use sms_client::http::HttpClient;
194 /// use sms_client::http::paginator::HttpPaginator;
195 /// use sms_client::http::types::HttpPaginationOptions;
196 ///
197 /// /// Read all messages from a phone number, in chunks of 10.
198 /// async fn read_all_messages(phone_number: &str, http: Arc<HttpClient>) {
199 /// let paginator = HttpPaginator::with_defaults(|pagination| {
200 /// http.get_messages(phone_number, pagination)
201 /// }).await;
202 ///
203 /// paginator.for_each_chuck(10, |batch| {
204 /// for message in batch {
205 /// log::info!("{:?}", message);
206 /// }
207 /// }).await?;
208 /// }
209 /// ```
210 pub async fn for_each_chuck<C>(mut self, chunk_size: usize, mut chunk_fn: C) -> HttpResult<()>
211 where
212 C: FnMut(&[T]) -> HttpResult<()>,
213 {
214 let mut chunk = Vec::with_capacity(chunk_size);
215
216 while let Some(item) = self.next().await {
217 chunk.push(item);
218
219 if chunk.len() >= chunk_size {
220 chunk_fn(&chunk)?;
221 chunk.clear();
222 }
223 }
224
225 // Process any remaining items in the final chunk.
226 if !chunk.is_empty() {
227 chunk_fn(&chunk)?;
228 }
229
230 Ok(())
231 }
232
233 /// Skip `n` items and return the paginator.
234 pub async fn skip(mut self, n: usize) -> Self {
235 for _ in 0..n {
236 if self.next().await.is_none() {
237 break;
238 }
239 }
240 self
241 }
242
243 /// Get the current pagination options state.
244 pub fn current_pagination(&self) -> &HttpPaginationOptions {
245 &self.pagination
246 }
247
248 /// Check if there are potentially more items to fetch.
249 pub fn has_more(&self) -> bool {
250 self.has_more || self.current_index < self.current_batch.len()
251 }
252}