salesforce_client/
pagination.rs1use crate::error::{SfError, SfResult};
7use serde::de::DeserializeOwned;
8use serde::Deserialize;
9use tracing::{debug, info};
10
11#[derive(Debug, Deserialize)]
13pub(crate) struct QueryResponse<T> {
14 pub records: Vec<T>,
16
17 pub done: bool,
19
20 #[serde(rename = "totalSize")]
22 #[allow(dead_code)]
23 pub total_size: Option<i32>,
24
25 #[serde(rename = "nextRecordsUrl")]
27 pub next_records_url: Option<String>,
28}
29
30impl<T> QueryResponse<T> {
31 #[allow(dead_code)]
33 pub fn has_more(&self) -> bool {
34 !self.done && self.next_records_url.is_some()
35 }
36}
37
38pub struct PaginatedQuery<T> {
51 client: reqwest::Client,
52 base_url: String,
53 access_token: String,
54 next_url: Option<String>,
55 finished: bool,
56 _phantom: std::marker::PhantomData<T>,
57}
58
59impl<T: DeserializeOwned> PaginatedQuery<T> {
60 pub(crate) fn new(
62 client: reqwest::Client,
63 base_url: String,
64 access_token: String,
65 initial_url: Option<String>,
66 ) -> Self {
67 let finished = initial_url.is_none();
68 Self {
69 client,
70 base_url,
71 access_token,
72 next_url: initial_url,
73 finished,
74 _phantom: std::marker::PhantomData,
75 }
76 }
77
78 pub async fn next(&mut self) -> SfResult<Option<Vec<T>>> {
80 if self.finished {
81 return Ok(None);
82 }
83
84 let url = match &self.next_url {
85 Some(path) => {
86 if path.starts_with("http") {
88 path.clone()
89 } else {
90 format!("{}{}", self.base_url, path)
91 }
92 }
93 None => {
94 self.finished = true;
95 return Ok(None);
96 }
97 };
98
99 debug!("Fetching paginated results from: {}", url);
100
101 let response = self
102 .client
103 .get(&url)
104 .header("Authorization", format!("Bearer {}", self.access_token))
105 .send()
106 .await?;
107
108 let status = response.status();
109 if !status.is_success() {
110 let body = response.text().await?;
111 return Err(SfError::Api {
112 status: status.as_u16(),
113 body,
114 });
115 }
116
117 let query_response: QueryResponse<T> = response.json().await?;
118
119 if query_response.done {
120 self.finished = true;
121 self.next_url = None;
122 info!("Pagination complete");
123 } else {
124 self.next_url = query_response.next_records_url;
125 debug!("More records available, next URL: {:?}", self.next_url);
126 }
127
128 Ok(Some(query_response.records))
129 }
130
131 pub async fn collect_all(mut self) -> SfResult<Vec<T>> {
136 let mut all_records = Vec::new();
137
138 while let Some(batch) = self.next().await? {
139 all_records.extend(batch);
140 }
141
142 info!(
143 "Collected {} total records across all pages",
144 all_records.len()
145 );
146 Ok(all_records)
147 }
148}
149
150#[derive(Debug, Clone)]
152pub struct QueryOptions {
153 pub limit: Option<usize>,
155
156 pub batch_size: usize,
158
159 pub auto_paginate: bool,
161}
162
163impl Default for QueryOptions {
164 fn default() -> Self {
165 Self {
166 limit: None,
167 batch_size: 2000,
168 auto_paginate: true,
169 }
170 }
171}
172
173impl QueryOptions {
174 pub fn new() -> Self {
176 Self::default()
177 }
178
179 pub fn limit(mut self, limit: usize) -> Self {
181 self.limit = Some(limit);
182 self
183 }
184
185 pub fn batch_size(mut self, size: usize) -> Self {
187 self.batch_size = size.min(2000); self
189 }
190
191 pub fn no_pagination(mut self) -> Self {
193 self.auto_paginate = false;
194 self
195 }
196}
197
198#[cfg(test)]
199mod tests {
200 use super::*;
201
202 #[test]
203 fn test_query_options_builder() {
204 let opts = QueryOptions::new().limit(1000).batch_size(500);
205
206 assert_eq!(opts.limit, Some(1000));
207 assert_eq!(opts.batch_size, 500);
208 assert!(opts.auto_paginate);
209 }
210
211 #[test]
212 fn test_query_options_max_batch_size() {
213 let opts = QueryOptions::new().batch_size(5000);
214
215 assert_eq!(opts.batch_size, 2000);
217 }
218}