1use std::collections::VecDeque;
16
17#[cfg(feature = "dynamic")]
18use crate::dynamic::types::ActionEntity;
19use crate::error::NifiError;
20#[cfg(not(feature = "dynamic"))]
21use crate::types::ActionEntity;
22
23type BoxedFetchFuture<'a> = std::pin::Pin<
29 Box<dyn core::future::Future<Output = Result<HistoryPage, NifiError>> + Send + 'a>,
30>;
31
32#[derive(Default, Debug, Clone)]
39pub struct HistoryFilter {
40 pub sort_column: Option<String>,
42 pub sort_order: Option<String>,
44 pub start_date: Option<String>,
46 pub end_date: Option<String>,
48 pub user_identity: Option<String>,
50 pub source_id: Option<String>,
52}
53
54#[derive(Debug, Clone)]
60pub struct HistoryPage {
61 pub actions: Vec<ActionEntity>,
63 pub total: i32,
65}
66
67pub struct HistoryPaginator<F> {
74 fetch: F,
75 page_size: u32,
76 offset: u32,
77 buffer: VecDeque<ActionEntity>,
78 exhausted: bool,
79}
80
81impl<F, Fut> HistoryPaginator<F>
82where
83 F: FnMut(u32, u32) -> Fut,
84 Fut: core::future::Future<Output = Result<HistoryPage, NifiError>>,
85{
86 pub fn from_fetcher(fetch: F, page_size: u32) -> Self {
93 Self {
94 fetch,
95 page_size,
96 offset: 0,
97 buffer: VecDeque::new(),
98 exhausted: false,
99 }
100 }
101
102 pub async fn next_page(&mut self) -> Result<Option<Vec<ActionEntity>>, NifiError> {
108 if self.exhausted {
109 return Ok(None);
110 }
111 let page = (self.fetch)(self.offset, self.page_size).await?;
112
113 let returned = page.actions.len() as u32;
114 self.offset = self.offset.saturating_add(returned);
115
116 if returned == 0
117 || returned < self.page_size
118 || i64::from(self.offset) >= i64::from(page.total)
119 {
120 self.exhausted = true;
121 }
122
123 if page.actions.is_empty() {
124 Ok(None)
125 } else {
126 Ok(Some(page.actions))
127 }
128 }
129
130 pub async fn next(&mut self) -> Result<Option<ActionEntity>, NifiError> {
136 loop {
137 if let Some(item) = self.buffer.pop_front() {
138 return Ok(Some(item));
139 }
140 match self.next_page().await? {
141 Some(page) => self.buffer.extend(page),
142 None => return Ok(None),
143 }
144 }
145 }
146}
147
148#[cfg(not(feature = "dynamic"))]
175pub fn flow_history<'a>(
176 client: &'a crate::NifiClient,
177 filter: HistoryFilter,
178 page_size: u32,
179) -> HistoryPaginator<impl FnMut(u32, u32) -> BoxedFetchFuture<'a> + 'a> {
180 use crate::require;
181 let fetch = move |offset: u32, count: u32| -> BoxedFetchFuture<'a> {
182 let filter = filter.clone();
183 Box::pin(async move {
184 let offset_s = offset.to_string();
185 let count_s = count.to_string();
186 let resp = client
187 .flow()
188 .query_history(
189 &offset_s,
190 &count_s,
191 filter.sort_column.as_deref(),
192 filter.sort_order.as_deref(),
193 filter.start_date.as_deref(),
194 filter.end_date.as_deref(),
195 filter.user_identity.as_deref(),
196 filter.source_id.as_deref(),
197 )
198 .await?;
199 let actions = require!(resp.actions).clone();
200 let total = *require!(resp.total);
201 Ok(HistoryPage { actions, total })
202 })
203 };
204 HistoryPaginator::from_fetcher(fetch, page_size)
205}
206
207#[cfg(feature = "dynamic")]
214pub fn flow_history_dynamic<'a>(
215 client: &'a crate::dynamic::DynamicClient,
216 filter: HistoryFilter,
217 page_size: u32,
218) -> HistoryPaginator<impl FnMut(u32, u32) -> BoxedFetchFuture<'a> + 'a> {
219 use crate::require;
220 let fetch = move |offset: u32, count: u32| -> BoxedFetchFuture<'a> {
221 let filter = filter.clone();
222 Box::pin(async move {
223 let offset_s = offset.to_string();
224 let count_s = count.to_string();
225 let resp = client
226 .flow()
227 .query_history(
228 &offset_s,
229 &count_s,
230 filter.sort_column.as_deref(),
231 filter.sort_order.as_deref(),
232 filter.start_date.as_deref(),
233 filter.end_date.as_deref(),
234 filter.user_identity.as_deref(),
235 filter.source_id.as_deref(),
236 )
237 .await?;
238 let actions = require!(resp.actions).clone();
239 let total = *require!(resp.total);
240 Ok(HistoryPage { actions, total })
241 })
242 };
243 HistoryPaginator::from_fetcher(fetch, page_size)
244}
245
246#[cfg(test)]
247mod tests {
248 use super::*;
249
250 fn make_action(id: i32) -> ActionEntity {
252 ActionEntity {
253 id: Some(id),
254 ..ActionEntity::default()
255 }
256 }
257
258 fn fake_fetcher(
262 total: i32,
263 ) -> (
264 impl FnMut(u32, u32) -> BoxedFetchFuture<'static>,
265 std::sync::Arc<std::sync::atomic::AtomicUsize>,
266 ) {
267 let calls = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
268 let calls_clone = std::sync::Arc::clone(&calls);
269 let fetch = move |offset: u32, count: u32| {
270 calls_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
271 let start = offset as i32;
272 let end = core::cmp::min(start.saturating_add(count as i32), total);
273 let actions: Vec<ActionEntity> = if start >= total {
274 Vec::new()
275 } else {
276 (start..end).map(make_action).collect()
277 };
278 let page = HistoryPage { actions, total };
279 Box::pin(async move { Ok(page) })
280 as std::pin::Pin<
281 Box<dyn core::future::Future<Output = Result<HistoryPage, NifiError>> + Send>,
282 >
283 };
284 (fetch, calls)
285 }
286
287 #[tokio::test]
288 async fn next_page_walks_all_pages_then_returns_none() {
289 let (fetch, calls) = fake_fetcher(250);
290 let mut pag = HistoryPaginator::from_fetcher(fetch, 100);
291
292 let p1 = pag.next_page().await.unwrap().unwrap();
293 assert_eq!(p1.len(), 100);
294 let p2 = pag.next_page().await.unwrap().unwrap();
295 assert_eq!(p2.len(), 100);
296 let p3 = pag.next_page().await.unwrap().unwrap();
297 assert_eq!(p3.len(), 50);
298 assert!(pag.next_page().await.unwrap().is_none());
299 assert_eq!(calls.load(std::sync::atomic::Ordering::SeqCst), 3);
300 }
301
302 #[tokio::test]
303 async fn next_page_short_page_terminates() {
304 let (fetch, calls) = fake_fetcher(150);
305 let mut pag = HistoryPaginator::from_fetcher(fetch, 100);
306
307 let p1 = pag.next_page().await.unwrap().unwrap();
308 assert_eq!(p1.len(), 100);
309 let p2 = pag.next_page().await.unwrap().unwrap();
310 assert_eq!(p2.len(), 50);
311 assert!(pag.next_page().await.unwrap().is_none());
312 assert_eq!(calls.load(std::sync::atomic::Ordering::SeqCst), 2);
313 }
314
315 #[tokio::test]
316 async fn next_page_empty_first_response_returns_none() {
317 let (fetch, calls) = fake_fetcher(0);
318 let mut pag = HistoryPaginator::from_fetcher(fetch, 100);
319
320 assert!(pag.next_page().await.unwrap().is_none());
321 assert!(pag.next_page().await.unwrap().is_none());
322 assert_eq!(calls.load(std::sync::atomic::Ordering::SeqCst), 1);
324 }
325
326 #[tokio::test]
327 async fn next_page_is_idempotent_after_exhaustion() {
328 let (fetch, calls) = fake_fetcher(50);
329 let mut pag = HistoryPaginator::from_fetcher(fetch, 100);
330
331 let p1 = pag.next_page().await.unwrap().unwrap();
332 assert_eq!(p1.len(), 50);
333 assert!(pag.next_page().await.unwrap().is_none());
334 assert!(pag.next_page().await.unwrap().is_none());
335 assert!(pag.next_page().await.unwrap().is_none());
336 assert_eq!(
337 calls.load(std::sync::atomic::Ordering::SeqCst),
338 1,
339 "fetcher must not be called after exhaustion"
340 );
341 }
342
343 #[tokio::test]
344 async fn next_page_does_not_advance_on_error() {
345 let calls = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
347 let calls_clone = std::sync::Arc::clone(&calls);
348 let fetch = move |offset: u32, count: u32| {
349 let n = calls_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
350 let actions: Vec<ActionEntity> = (offset..offset + count)
351 .map(|i| make_action(i as i32))
352 .collect();
353 let fail = n == 1;
354 Box::pin(async move {
355 if fail {
356 Err(NifiError::Unauthorized {
357 message: "simulated".to_string(),
358 })
359 } else {
360 Ok(HistoryPage {
361 actions,
362 total: 300,
363 })
364 }
365 })
366 as std::pin::Pin<
367 Box<dyn core::future::Future<Output = Result<HistoryPage, NifiError>> + Send>,
368 >
369 };
370 let mut pag = HistoryPaginator::from_fetcher(fetch, 100);
371
372 let p1 = pag.next_page().await.unwrap().unwrap();
373 assert_eq!(p1.len(), 100);
374 assert!(pag.next_page().await.is_err());
375 let p2 = pag.next_page().await.unwrap().unwrap();
377 assert_eq!(p2.first().and_then(|a| a.id), Some(100));
378 }
379
380 #[tokio::test]
381 async fn next_page_offset_overflow_saturates() {
382 let calls = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
386 let calls_clone = std::sync::Arc::clone(&calls);
387 let count = 100_000_u32;
388 let fetch = move |offset: u32, _count: u32| {
389 calls_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
390 let actions: Vec<ActionEntity> = (0..count)
391 .map(|i| make_action((offset as i32).wrapping_add(i as i32)))
392 .collect();
393 Box::pin(async move {
394 Ok(HistoryPage {
395 actions,
396 total: i32::MAX,
397 })
398 })
399 as std::pin::Pin<
400 Box<dyn core::future::Future<Output = Result<HistoryPage, NifiError>> + Send>,
401 >
402 };
403 let mut pag = HistoryPaginator::from_fetcher(fetch, count);
404 let mut pages = 0_usize;
408 while pag.next_page().await.unwrap().is_some() {
409 pages += 1;
410 assert!(pages < 25_000, "paginator failed to terminate");
411 }
412 }
414
415 #[tokio::test]
416 async fn item_next_buffers_pages_and_yields_all() {
417 let (fetch, calls) = fake_fetcher(5);
418 let mut pag = HistoryPaginator::from_fetcher(fetch, 2);
419
420 let mut ids = Vec::new();
421 while let Some(action) = pag.next().await.unwrap() {
422 ids.push(action.id.unwrap());
423 }
424 assert_eq!(ids, vec![0, 1, 2, 3, 4]);
425 assert_eq!(calls.load(std::sync::atomic::Ordering::SeqCst), 3);
427 }
428}