sms_client/http/paginator.rs
1//! HTTP request paginator, supporting lazy traversal across large sets
2
3use crate::http::types::HttpPaginationOptions;
4use crate::http::error::*;
5
6/// Call a function with an update HttpPaginationOptions for each batch request,
7/// simplifying lazy access to large response sets such as messages etc.
8pub struct HttpPaginator<T, F, Fut> {
9 http_fn: F,
10 pagination: HttpPaginationOptions,
11 current_batch: Vec<T>,
12 current_index: usize,
13 has_more: bool,
14 initial_limit: u64,
15 _phantom: std::marker::PhantomData<Fut>
16}
17impl<T, F, Fut> HttpPaginator<T, F, Fut>
18where
19 F: Fn(Option<HttpPaginationOptions>) -> Fut,
20 Fut: Future<Output = HttpResult<Vec<T>>>
21{
22
23 /// Create the paginator with the http batch generator.
24 ///
25 /// # Example
26 /// ```rust
27 /// use sms_client::Client;
28 /// use sms_client::config::ClientConfig;
29 /// use sms_client::http::paginator::HttpPaginator;
30 /// use sms_client::http::types::HttpPaginationOptions;
31 ///
32 /// let http = Client::new(ClientConfig::http_only("http://localhost:3000").with_auth("token!"))?.http_arc();
33 /// let mut paginator = HttpPaginator::new(
34 /// move |pagination| {
35 /// let http = http.clone();
36 /// async move {
37 /// http.get_latest_numbers(pagination).await
38 /// }
39 /// },
40 /// HttpPaginationOptions::default()
41 /// .with_limit(10) // Do it in batches of 10.
42 /// .with_offset(10) // Skip the first 10 results.
43 /// .with_reverse(true) // Reverse the results set.
44 /// );
45 /// ```
46 pub fn new(http_fn: F, pagination: HttpPaginationOptions) -> Self {
47 let initial_limit = pagination.limit.unwrap_or(50);
48
49 Self {
50 http_fn,
51 pagination,
52 current_batch: Vec::new(),
53 current_index: 0,
54 has_more: true,
55 initial_limit,
56 _phantom: std::marker::PhantomData
57 }
58 }
59
60 /// Create a paginator with default pagination settings.
61 /// This starts at offset 0 with a limit of 50 per page.
62 ///
63 /// # Example
64 /// ```rust
65 /// use sms_client::http;
66 /// use sms_client::Client;
67 /// use sms_client::config::ClientConfig;
68 /// use sms_client::http::HttpClient;
69 /// use sms_client::http::paginator::HttpPaginator;
70 ///
71 /// /// View all latest numbers, in a default paginator with a limit of 50 per chunk.
72 /// async fn view_all_latest_numbers(http: HttpClient) {
73 /// let mut paginator = HttpPaginator::with_defaults(|pagination| {
74 /// http.get_latest_numbers(pagination)
75 /// });
76 /// while let Some(message) = paginator.next().await {
77 /// log::info!("{:?}", message);
78 /// }
79 /// }
80 /// ```
81 pub fn with_defaults(http_fn: F) -> Self {
82 Self::new(
83 http_fn,
84 HttpPaginationOptions::default()
85 .with_limit(50)
86 .with_offset(0)
87 )
88 }
89
90 /// Fetch the next batch of items from the API.
91 async fn fetch_next_batch(&mut self) -> HttpResult<bool> {
92 log::trace!("Fetching next batch: {:?}", self.pagination);
93 let response = (self.http_fn)(Some(self.pagination.clone())).await?;
94
95 let received_count = response.len() as u64;
96 self.has_more = received_count >= self.initial_limit;
97
98 // If no more items have been received, we're definitely done.
99 if received_count == 0 {
100 self.has_more = false;
101 return Ok(false);
102 }
103
104 self.current_batch = response;
105 self.current_index = 0;
106
107 // Update offset for next request.
108 if let Some(current_offset) = self.pagination.offset {
109 self.pagination.offset = Some(current_offset + received_count);
110 } else {
111
112 // If no offset was set initially, start from the received count
113 self.pagination.offset = Some(received_count);
114 }
115
116 Ok(true)
117 }
118
119 /// Get the next item, automatically fetching next pages as needed.
120 ///
121 /// # Example
122 /// ```rust
123 /// use sms_client::http::HttpClient;
124 /// use sms_client::http::paginator::HttpPaginator;
125 ///
126 /// async fn get_delivery_reports(message_id: i64, http: HttpClient) {
127 /// let mut paginator = HttpPaginator::with_defaults(|pagination| {
128 /// http.get_delivery_reports(message_id, pagination)
129 /// }).await;
130 ///
131 /// /// Iterate through ALL messages, with a page size of 50 (default).
132 /// while let Some(message) = paginator.next().await {
133 /// log::info!("{:?}", message);
134 /// }
135 /// }
136 /// ```
137 pub async fn next(&mut self) -> Option<T> {
138 if self.current_index >= self.current_batch.len() {
139
140 // If there aren't any-more, then there is nothing to fetch next.
141 if !self.has_more {
142 return None;
143 }
144
145 match self.fetch_next_batch().await {
146 Ok(true) => {}, // Successfully fetched more data
147 Ok(false) | Err(_) => return None // No more data or error
148 }
149 }
150
151 // Return the next item if available.
152 if self.current_index < self.current_batch.len() {
153 let item = self.current_batch.remove(0);
154 Some(item)
155 } else {
156 None
157 }
158 }
159
160 /// Collect all remaining items into a Vec.
161 /// This continues to request batches until empty.
162 pub async fn collect_all(mut self) -> HttpResult<Vec<T>> {
163 let mut all_items = Vec::new();
164
165 if self.current_batch.is_empty() && self.has_more {
166 self.fetch_next_batch().await?;
167 }
168
169 while let Some(item) = self.next().await {
170 all_items.push(item);
171 }
172
173 Ok(all_items)
174 }
175
176 /// Process items in chunks, calling the provided closure for each chunk.
177 pub async fn take(mut self, n: usize) -> HttpResult<Vec<T>> {
178 let mut items = Vec::with_capacity(n.min(100)); // Cap initial capacity
179
180 for _ in 0..n {
181 if let Some(item) = self.next().await {
182 items.push(item);
183 } else {
184 break;
185 }
186 }
187
188 Ok(items)
189 }
190
191 /// Process items in chunks, calling the provided closure for each chunk.
192 ///
193 /// # Example
194 /// ```rust
195 /// use std::sync::Arc;
196 /// use sms_client::http::HttpClient;
197 /// use sms_client::http::paginator::HttpPaginator;
198 /// use sms_client::http::types::HttpPaginationOptions;
199 ///
200 /// /// Read all messages from a phone number, in chunks of 10.
201 /// async fn read_all_messages(phone_number: &str, http: Arc<HttpClient>) {
202 /// let paginator = HttpPaginator::with_defaults(|pagination| {
203 /// http.get_messages(phone_number, pagination)
204 /// }).await;
205 ///
206 /// paginator.for_each_chuck(10, |batch| {
207 /// for message in batch {
208 /// log::info!("{:?}", message);
209 /// }
210 /// }).await?;
211 /// }
212 /// ```
213 pub async fn for_each_chuck<C>(mut self, chunk_size: usize, mut chunk_fn: C) -> HttpResult<()>
214 where
215 C: FnMut(&[T]) -> HttpResult<()>
216 {
217 let mut chunk = Vec::with_capacity(chunk_size);
218
219 while let Some(item) = self.next().await {
220 chunk.push(item);
221
222 if chunk.len() >= chunk_size {
223 chunk_fn(&chunk)?;
224 chunk.clear();
225 }
226 }
227
228 // Process any remaining items in the final chunk.
229 if !chunk.is_empty() {
230 chunk_fn(&chunk)?;
231 }
232
233 Ok(())
234 }
235
236 /// Skip `n` items and return the paginator.
237 pub async fn skip(mut self, n: usize) -> Self {
238 for _ in 0..n {
239 if self.next().await.is_none() {
240 break;
241 }
242 }
243 self
244 }
245
246 /// Get the current pagination options state.
247 pub fn current_pagination(&self) -> &HttpPaginationOptions {
248 &self.pagination
249 }
250
251 /// Check if there are potentially more items to fetch.
252 pub fn has_more(&self) -> bool {
253 self.has_more || self.current_index < self.current_batch.len()
254 }
255}