sms_client/http/
paginator.rs

1//! HTTP request paginator, supporting lazy traversal across large sets
2
3use crate::http::error::HttpResult;
4use sms_types::http::HttpPaginationOptions;
5
6/// Call a function with an update `HttpPaginationOptions` for each batch request,
7/// simplifying lazy access to large response sets such as messages etc.
8pub struct HttpPaginator<T, F, Fut> {
9    http_fn: F,
10    pagination: HttpPaginationOptions,
11    current_batch: Vec<T>,
12    current_index: usize,
13    has_more: bool,
14    initial_limit: u64,
15    _phantom: std::marker::PhantomData<Fut>,
16}
17impl<T, F, Fut> HttpPaginator<T, F, Fut>
18where
19    F: Fn(Option<HttpPaginationOptions>) -> Fut,
20    Fut: Future<Output = HttpResult<Vec<T>>>,
21{
22    /// Create the paginator with the http batch generator.
23    ///
24    /// # Example
25    /// ```
26    /// use sms_client::Client;
27    /// use sms_client::config::ClientConfig;
28    /// use sms_client::http::paginator::HttpPaginator;
29    /// use sms_client::types::http::HttpPaginationOptions;
30    ///
31    /// let http = Client::new(ClientConfig::http_only("http://localhost:3000").with_auth("token!"))?.http_arc();
32    /// let mut paginator = HttpPaginator::new(
33    ///     move |pagination| {
34    ///         let http = http.expect("Missing HTTP client configuration!").clone();
35    ///         async move {
36    ///             http.get_latest_numbers(pagination).await
37    ///         }
38    ///     },
39    ///     HttpPaginationOptions::default()
40    ///         .with_limit(10) // Do it in batches of 10.
41    ///         .with_offset(10) // Skip the first 10 results.
42    ///         .with_reverse(true) // Reverse the results set.
43    /// );
44    /// ```
45    pub fn new(http_fn: F, pagination: HttpPaginationOptions) -> Self {
46        let initial_limit = pagination.limit.unwrap_or(50);
47
48        Self {
49            http_fn,
50            pagination,
51            current_batch: Vec::new(),
52            current_index: 0,
53            has_more: true,
54            initial_limit,
55            _phantom: std::marker::PhantomData,
56        }
57    }
58
59    /// Create a paginator with default pagination settings.
60    /// This starts at offset 0 with a limit of 50 per page.
61    ///
62    /// # Example
63    /// ```
64    /// use sms_client::Client;
65    /// use sms_client::config::ClientConfig;
66    /// use sms_client::http::HttpClient;
67    /// use sms_client::http::paginator::HttpPaginator;
68    ///
69    /// /// View all latest numbers, in a default paginator with a limit of 50 per chunk.
70    /// async fn view_all_latest_numbers(http: HttpClient) {
71    ///     let mut paginator = HttpPaginator::with_defaults(|pagination| {
72    ///         http.get_latest_numbers(pagination)
73    ///     });
74    ///     while let Some(message) = paginator.next().await {
75    ///         log::info!("{:?}", message);
76    ///     }
77    /// }
78    /// ```
79    pub fn with_defaults(http_fn: F) -> Self {
80        Self::new(
81            http_fn,
82            HttpPaginationOptions::default()
83                .with_limit(50)
84                .with_offset(0),
85        )
86    }
87
88    /// Fetch the next batch of items from the API.
89    async fn fetch_next_batch(&mut self) -> HttpResult<bool> {
90        let response = (self.http_fn)(Some(self.pagination)).await?;
91
92        let received_count = response.len() as u64;
93        self.has_more = received_count >= self.initial_limit;
94
95        // If no more items have been received, we're definitely done.
96        if received_count == 0 {
97            self.has_more = false;
98            return Ok(false);
99        }
100
101        self.current_batch = response;
102        self.current_index = 0;
103
104        // Update offset for next request.
105        if let Some(current_offset) = self.pagination.offset {
106            self.pagination.offset = Some(current_offset + received_count);
107        } else {
108            // If no offset was set initially, start from the received count
109            self.pagination.offset = Some(received_count);
110        }
111
112        Ok(true)
113    }
114
115    /// Get the next item, automatically fetching next pages as needed.
116    ///
117    /// # Example
118    /// ```
119    /// use sms_client::http::HttpClient;
120    /// use sms_client::http::paginator::HttpPaginator;
121    ///
122    /// async fn get_delivery_reports(message_id: i64, http: HttpClient) {
123    ///     let mut paginator = HttpPaginator::with_defaults(|pagination| {
124    ///         http.get_delivery_reports(message_id, pagination)
125    ///     }).await;
126    ///
127    ///     /// Iterate through ALL messages, with a page size of 50 (default).
128    ///     while let Some(message) = paginator.next().await {
129    ///         log::info!("{:?}", message);
130    ///     }
131    /// }
132    /// ```
133    pub async fn next(&mut self) -> Option<T> {
134        if self.current_index >= self.current_batch.len() {
135            // If there aren't any-more, then there is nothing to fetch next.
136            if !self.has_more {
137                return None;
138            }
139
140            match self.fetch_next_batch().await {
141                Ok(true) => {}                     // Successfully fetched more data
142                Ok(false) | Err(_) => return None, // No more data or error
143            }
144        }
145
146        // Return the next item if available.
147        if self.current_index < self.current_batch.len() {
148            let item = self.current_batch.remove(0);
149            Some(item)
150        } else {
151            None
152        }
153    }
154
155    /// Collect all remaining items into a Vec.
156    /// This continues to request batches until empty.
157    pub async fn collect_all(mut self) -> HttpResult<Vec<T>> {
158        let mut all_items = Vec::new();
159
160        if self.current_batch.is_empty() && self.has_more {
161            self.fetch_next_batch().await?;
162        }
163
164        while let Some(item) = self.next().await {
165            all_items.push(item);
166        }
167
168        Ok(all_items)
169    }
170
171    /// Process items in chunks, calling the provided closure for each chunk.
172    pub async fn take(mut self, n: usize) -> HttpResult<Vec<T>> {
173        let mut items = Vec::with_capacity(n.min(100)); // Cap initial capacity
174
175        for _ in 0..n {
176            if let Some(item) = self.next().await {
177                items.push(item);
178            } else {
179                break;
180            }
181        }
182
183        Ok(items)
184    }
185
186    /// Process items in chunks, calling the provided closure for each chunk.
187    ///
188    /// # Example
189    /// ```
190    /// use std::sync::Arc;
191    /// use sms_client::http::HttpClient;
192    /// use sms_client::http::paginator::HttpPaginator;
193    /// use sms_client::types::http::HttpPaginationOptions;
194    ///
195    /// /// Read all messages from a phone number, in chunks of 10.
196    /// async fn read_all_messages(phone_number: &str, http: Arc<HttpClient>) {
197    ///     let paginator = HttpPaginator::with_defaults(|pagination| {
198    ///         http.get_messages(phone_number, pagination)
199    ///     }).await;
200    ///
201    ///     paginator.for_each_chuck(10, |batch| {
202    ///         for message in batch {
203    ///             log::info!("{:?}", message);
204    ///         }
205    ///     }).await?;
206    /// }
207    /// ```
208    pub async fn for_each_chuck<C>(mut self, chunk_size: usize, mut chunk_fn: C) -> HttpResult<()>
209    where
210        C: FnMut(&[T]) -> HttpResult<()>,
211    {
212        let mut chunk = Vec::with_capacity(chunk_size);
213
214        while let Some(item) = self.next().await {
215            chunk.push(item);
216
217            if chunk.len() >= chunk_size {
218                chunk_fn(&chunk)?;
219                chunk.clear();
220            }
221        }
222
223        // Process any remaining items in the final chunk.
224        if !chunk.is_empty() {
225            chunk_fn(&chunk)?;
226        }
227
228        Ok(())
229    }
230
231    /// Skip `n` items and return the paginator.
232    pub async fn skip(mut self, n: usize) -> Self {
233        for _ in 0..n {
234            if self.next().await.is_none() {
235                break;
236            }
237        }
238        self
239    }
240
241    /// Get the current pagination options state.
242    pub fn current_pagination(&self) -> &HttpPaginationOptions {
243        &self.pagination
244    }
245
246    /// Check if there are potentially more items to fetch.
247    pub fn has_more(&self) -> bool {
248        self.has_more || self.current_index < self.current_batch.len()
249    }
250}