sms_client/http/
paginator.rs

1//! HTTP request paginator, supporting lazy traversal across large sets
2
3use crate::http::error::HttpResult;
4use crate::http::types::HttpPaginationOptions;
5
6/// Call a function with an update `HttpPaginationOptions` for each batch request,
7/// simplifying lazy access to large response sets such as messages etc.
8pub struct HttpPaginator<T, F, Fut> {
9    http_fn: F,
10    pagination: HttpPaginationOptions,
11    current_batch: Vec<T>,
12    current_index: usize,
13    has_more: bool,
14    initial_limit: u64,
15    _phantom: std::marker::PhantomData<Fut>,
16}
17impl<T, F, Fut> HttpPaginator<T, F, Fut>
18where
19    F: Fn(Option<HttpPaginationOptions>) -> Fut,
20    Fut: Future<Output = HttpResult<Vec<T>>>,
21{
22    /// Create the paginator with the http batch generator.
23    ///
24    /// # Example
25    /// ```rust
26    /// use sms_client::Client;
27    /// use sms_client::config::ClientConfig;
28    /// use sms_client::http::paginator::HttpPaginator;
29    /// use sms_client::http::types::HttpPaginationOptions;
30    ///
31    /// let http = Client::new(ClientConfig::http_only("http://localhost:3000").with_auth("token!"))?.http_arc();
32    /// let mut paginator = HttpPaginator::new(
33    ///     move |pagination| {
34    ///         let http = http.clone();
35    ///         async move {
36    ///             http.get_latest_numbers(pagination).await
37    ///         }
38    ///     },
39    ///     HttpPaginationOptions::default()
40    ///         .with_limit(10) // Do it in batches of 10.
41    ///         .with_offset(10) // Skip the first 10 results.
42    ///         .with_reverse(true) // Reverse the results set.
43    /// );
44    /// ```
45    pub fn new(http_fn: F, pagination: HttpPaginationOptions) -> Self {
46        let initial_limit = pagination.limit.unwrap_or(50);
47
48        Self {
49            http_fn,
50            pagination,
51            current_batch: Vec::new(),
52            current_index: 0,
53            has_more: true,
54            initial_limit,
55            _phantom: std::marker::PhantomData,
56        }
57    }
58
59    /// Create a paginator with default pagination settings.
60    /// This starts at offset 0 with a limit of 50 per page.
61    ///
62    /// # Example
63    /// ```rust
64    /// use sms_client::http;
65    /// use sms_client::Client;
66    /// use sms_client::config::ClientConfig;
67    /// use sms_client::http::HttpClient;
68    /// use sms_client::http::paginator::HttpPaginator;
69    ///
70    /// /// View all latest numbers, in a default paginator with a limit of 50 per chunk.
71    /// async fn view_all_latest_numbers(http: HttpClient) {
72    ///     let mut paginator = HttpPaginator::with_defaults(|pagination| {
73    ///         http.get_latest_numbers(pagination)
74    ///     });
75    ///     while let Some(message) = paginator.next().await {
76    ///         log::info!("{:?}", message);
77    ///     }
78    /// }
79    /// ```
80    pub fn with_defaults(http_fn: F) -> Self {
81        Self::new(
82            http_fn,
83            HttpPaginationOptions::default()
84                .with_limit(50)
85                .with_offset(0),
86        )
87    }
88
89    /// Fetch the next batch of items from the API.
90    async fn fetch_next_batch(&mut self) -> HttpResult<bool> {
91        log::trace!("Fetching next batch: {:?}", self.pagination);
92        let response = (self.http_fn)(Some(self.pagination)).await?;
93
94        let received_count = response.len() as u64;
95        self.has_more = received_count >= self.initial_limit;
96
97        // If no more items have been received, we're definitely done.
98        if received_count == 0 {
99            self.has_more = false;
100            return Ok(false);
101        }
102
103        self.current_batch = response;
104        self.current_index = 0;
105
106        // Update offset for next request.
107        if let Some(current_offset) = self.pagination.offset {
108            self.pagination.offset = Some(current_offset + received_count);
109        } else {
110            // If no offset was set initially, start from the received count
111            self.pagination.offset = Some(received_count);
112        }
113
114        Ok(true)
115    }
116
117    /// Get the next item, automatically fetching next pages as needed.
118    ///
119    /// # Example
120    /// ```rust
121    /// use sms_client::http::HttpClient;
122    /// use sms_client::http::paginator::HttpPaginator;
123    ///
124    /// async fn get_delivery_reports(message_id: i64, http: HttpClient) {
125    ///     let mut paginator = HttpPaginator::with_defaults(|pagination| {
126    ///         http.get_delivery_reports(message_id, pagination)
127    ///     }).await;
128    ///
129    ///     /// Iterate through ALL messages, with a page size of 50 (default).
130    ///     while let Some(message) = paginator.next().await {
131    ///         log::info!("{:?}", message);
132    ///     }
133    /// }
134    /// ```
135    pub async fn next(&mut self) -> Option<T> {
136        if self.current_index >= self.current_batch.len() {
137            // If there aren't any-more, then there is nothing to fetch next.
138            if !self.has_more {
139                return None;
140            }
141
142            match self.fetch_next_batch().await {
143                Ok(true) => {}                     // Successfully fetched more data
144                Ok(false) | Err(_) => return None, // No more data or error
145            }
146        }
147
148        // Return the next item if available.
149        if self.current_index < self.current_batch.len() {
150            let item = self.current_batch.remove(0);
151            Some(item)
152        } else {
153            None
154        }
155    }
156
157    /// Collect all remaining items into a Vec.
158    /// This continues to request batches until empty.
159    pub async fn collect_all(mut self) -> HttpResult<Vec<T>> {
160        let mut all_items = Vec::new();
161
162        if self.current_batch.is_empty() && self.has_more {
163            self.fetch_next_batch().await?;
164        }
165
166        while let Some(item) = self.next().await {
167            all_items.push(item);
168        }
169
170        Ok(all_items)
171    }
172
173    /// Process items in chunks, calling the provided closure for each chunk.
174    pub async fn take(mut self, n: usize) -> HttpResult<Vec<T>> {
175        let mut items = Vec::with_capacity(n.min(100)); // Cap initial capacity
176
177        for _ in 0..n {
178            if let Some(item) = self.next().await {
179                items.push(item);
180            } else {
181                break;
182            }
183        }
184
185        Ok(items)
186    }
187
188    /// Process items in chunks, calling the provided closure for each chunk.
189    ///
190    /// # Example
191    /// ```rust
192    /// use std::sync::Arc;
193    /// use sms_client::http::HttpClient;
194    /// use sms_client::http::paginator::HttpPaginator;
195    /// use sms_client::http::types::HttpPaginationOptions;
196    ///
197    /// /// Read all messages from a phone number, in chunks of 10.
198    /// async fn read_all_messages(phone_number: &str, http: Arc<HttpClient>) {
199    ///     let paginator = HttpPaginator::with_defaults(|pagination| {
200    ///         http.get_messages(phone_number, pagination)
201    ///     }).await;
202    ///
203    ///     paginator.for_each_chuck(10, |batch| {
204    ///         for message in batch {
205    ///             log::info!("{:?}", message);
206    ///         }
207    ///     }).await?;
208    /// }
209    /// ```
210    pub async fn for_each_chuck<C>(mut self, chunk_size: usize, mut chunk_fn: C) -> HttpResult<()>
211    where
212        C: FnMut(&[T]) -> HttpResult<()>,
213    {
214        let mut chunk = Vec::with_capacity(chunk_size);
215
216        while let Some(item) = self.next().await {
217            chunk.push(item);
218
219            if chunk.len() >= chunk_size {
220                chunk_fn(&chunk)?;
221                chunk.clear();
222            }
223        }
224
225        // Process any remaining items in the final chunk.
226        if !chunk.is_empty() {
227            chunk_fn(&chunk)?;
228        }
229
230        Ok(())
231    }
232
233    /// Skip `n` items and return the paginator.
234    pub async fn skip(mut self, n: usize) -> Self {
235        for _ in 0..n {
236            if self.next().await.is_none() {
237                break;
238            }
239        }
240        self
241    }
242
243    /// Get the current pagination options state.
244    pub fn current_pagination(&self) -> &HttpPaginationOptions {
245        &self.pagination
246    }
247
248    /// Check if there are potentially more items to fetch.
249    pub fn has_more(&self) -> bool {
250        self.has_more || self.current_index < self.current_batch.len()
251    }
252}