Skip to main content

nifi_rust_client/
pagination.rs

1//! Pagination helpers for NiFi REST endpoints.
2//!
3//! Currently provides `HistoryPaginator` for walking
4//! `GET /flow/history` one page (or one action) at a time. See
5//! `flow_history` for the static-mode constructor and
6//! `flow_history_dynamic` (requires `dynamic` feature) for the
7//! dynamic-mode constructor.
8//!
9//! The paginator is a thin state machine: one HTTP call per page,
10//! offset advanced by `page_size`, termination by any of
11//! (a) empty page, (b) short page, (c) offset reached `total`.
12//! Retries are handled per-request by the underlying [`crate::NifiClient`];
13//! the paginator adds no retry logic of its own.
14
15use std::collections::VecDeque;
16
17#[cfg(feature = "dynamic")]
18use crate::dynamic::types::ActionEntity;
19use crate::error::NifiError;
20#[cfg(not(feature = "dynamic"))]
21use crate::types::ActionEntity;
22
23/// Boxed future returned by the fetch closures in this module.
24///
25/// Using `Pin<Box<dyn Future>>` sidesteps chained-`impl Trait` lifetime
26/// issues in the constructor return types. One allocation per page —
27/// negligible against the HTTP round trip.
28type BoxedFetchFuture<'a> = std::pin::Pin<
29    Box<dyn core::future::Future<Output = Result<HistoryPage, NifiError>> + Send + 'a>,
30>;
31
32/// Filter criteria for `GET /flow/history`. All fields are optional;
33/// `HistoryFilter::default()` yields an unfiltered history query.
34///
35/// Field names mirror NiFi's query parameters. Date fields are passed
36/// through as strings in NiFi's expected format
37/// (`MM/dd/yyyy HH:mm:ss`); the crate does not parse or validate them.
38#[derive(Default, Debug, Clone)]
39pub struct HistoryFilter {
40    /// Column to sort by (e.g. `"timestamp"`).
41    pub sort_column: Option<String>,
42    /// Sort order (`"asc"` or `"desc"`).
43    pub sort_order: Option<String>,
44    /// Inclusive lower bound on action timestamp.
45    pub start_date: Option<String>,
46    /// Inclusive upper bound on action timestamp.
47    pub end_date: Option<String>,
48    /// Filter by user identity.
49    pub user_identity: Option<String>,
50    /// Filter by source component id.
51    pub source_id: Option<String>,
52}
53
54/// Shape returned by a page-fetch closure given to [`HistoryPaginator`].
55///
56/// Exposed so advanced users can drive the paginator with their own
57/// closure for endpoints not covered by `flow_history` /
58/// `flow_history_dynamic`.
59#[derive(Debug, Clone)]
60pub struct HistoryPage {
61    /// Actions returned by the server for this page.
62    pub actions: Vec<ActionEntity>,
63    /// Server-reported total number of actions matching the filter.
64    pub total: i32,
65}
66
67/// Async iterator over pages of NiFi flow history actions.
68///
69/// Created via `flow_history` or `flow_history_dynamic`. Each call
70/// to [`next_page`](Self::next_page) issues one `GET /flow/history`
71/// request advancing `offset` by `page_size`. [`next`](Self::next)
72/// yields one item at a time, buffering the current page internally.
73pub struct HistoryPaginator<F> {
74    fetch: F,
75    page_size: u32,
76    offset: u32,
77    buffer: VecDeque<ActionEntity>,
78    exhausted: bool,
79}
80
81impl<F, Fut> HistoryPaginator<F>
82where
83    F: FnMut(u32, u32) -> Fut,
84    Fut: core::future::Future<Output = Result<HistoryPage, NifiError>>,
85{
86    /// Construct a paginator directly from a fetch closure.
87    ///
88    /// Most users call `flow_history` or `flow_history_dynamic`
89    /// instead, which build the closure for the NiFi history endpoint.
90    /// Advanced callers can use this constructor to paginate their
91    /// own endpoints that follow the same offset/count + total shape.
92    pub fn from_fetcher(fetch: F, page_size: u32) -> Self {
93        Self {
94            fetch,
95            page_size,
96            offset: 0,
97            buffer: VecDeque::new(),
98            exhausted: false,
99        }
100    }
101
102    /// Fetch the next page of actions.
103    ///
104    /// Returns `Ok(None)` once the history is exhausted. Idempotent
105    /// after exhaustion — further calls return `Ok(None)` without
106    /// issuing a request.
107    pub async fn next_page(&mut self) -> Result<Option<Vec<ActionEntity>>, NifiError> {
108        if self.exhausted {
109            return Ok(None);
110        }
111        let page = (self.fetch)(self.offset, self.page_size).await?;
112
113        let returned = page.actions.len() as u32;
114        self.offset = self.offset.saturating_add(returned);
115
116        if returned == 0
117            || returned < self.page_size
118            || i64::from(self.offset) >= i64::from(page.total)
119        {
120            self.exhausted = true;
121        }
122
123        if page.actions.is_empty() {
124            Ok(None)
125        } else {
126            Ok(Some(page.actions))
127        }
128    }
129
130    /// Yield the next action, buffering pages internally.
131    ///
132    /// Returns `Ok(None)` once the history is exhausted. Each
133    /// underlying page is fetched lazily on demand via
134    /// [`next_page`](Self::next_page).
135    pub async fn next(&mut self) -> Result<Option<ActionEntity>, NifiError> {
136        loop {
137            if let Some(item) = self.buffer.pop_front() {
138                return Ok(Some(item));
139            }
140            match self.next_page().await? {
141                Some(page) => self.buffer.extend(page),
142                None => return Ok(None),
143            }
144        }
145    }
146}
147
148/// Build a [`HistoryPaginator`] backed by a static-mode [`crate::NifiClient`].
149///
150/// Each page is fetched by calling `client.flow().query_history(...)`
151/// with the provided `filter` and the current offset/page_size. Missing
152/// `actions` or `total` fields in the response surface as
153/// [`NifiError::MissingField`] via the [`crate::require!`] macro.
154///
155/// # Example
156///
157/// ```no_run
158/// use nifi_rust_client::{NifiClientBuilder, NifiError};
159/// use nifi_rust_client::pagination::{flow_history, HistoryFilter};
160///
161/// # async fn example() -> Result<(), NifiError> {
162/// let client = NifiClientBuilder::new("https://nifi.example.com:8443")?.build()?;
163/// client.login("admin", "adminpassword123").await?;
164///
165/// let mut pag = flow_history(&client, HistoryFilter::default(), 100);
166/// while let Some(page) = pag.next_page().await? {
167///     for action in page {
168///         println!("{action:?}");
169///     }
170/// }
171/// # Ok(())
172/// # }
173/// ```
174#[cfg(not(feature = "dynamic"))]
175pub fn flow_history<'a>(
176    client: &'a crate::NifiClient,
177    filter: HistoryFilter,
178    page_size: u32,
179) -> HistoryPaginator<impl FnMut(u32, u32) -> BoxedFetchFuture<'a> + 'a> {
180    use crate::require;
181    let fetch = move |offset: u32, count: u32| -> BoxedFetchFuture<'a> {
182        let filter = filter.clone();
183        Box::pin(async move {
184            let offset_s = offset.to_string();
185            let count_s = count.to_string();
186            let resp = client
187                .flow()
188                .query_history(
189                    &offset_s,
190                    &count_s,
191                    filter.sort_column.as_deref(),
192                    filter.sort_order.as_deref(),
193                    filter.start_date.as_deref(),
194                    filter.end_date.as_deref(),
195                    filter.user_identity.as_deref(),
196                    filter.source_id.as_deref(),
197                )
198                .await?;
199            let actions = require!(resp.actions).clone();
200            let total = *require!(resp.total);
201            Ok(HistoryPage { actions, total })
202        })
203    };
204    HistoryPaginator::from_fetcher(fetch, page_size)
205}
206
207/// Build a [`HistoryPaginator`] backed by a dynamic-mode
208/// [`crate::dynamic::DynamicClient`].
209///
210/// Same iteration semantics as `flow_history`. Missing `actions` /
211/// `total` fields in the response surface as
212/// [`NifiError::MissingField`] via the [`crate::require!`] macro.
213#[cfg(feature = "dynamic")]
214pub fn flow_history_dynamic<'a>(
215    client: &'a crate::dynamic::DynamicClient,
216    filter: HistoryFilter,
217    page_size: u32,
218) -> HistoryPaginator<impl FnMut(u32, u32) -> BoxedFetchFuture<'a> + 'a> {
219    use crate::require;
220    let fetch = move |offset: u32, count: u32| -> BoxedFetchFuture<'a> {
221        let filter = filter.clone();
222        Box::pin(async move {
223            let offset_s = offset.to_string();
224            let count_s = count.to_string();
225            let resp = client
226                .flow()
227                .query_history(
228                    &offset_s,
229                    &count_s,
230                    filter.sort_column.as_deref(),
231                    filter.sort_order.as_deref(),
232                    filter.start_date.as_deref(),
233                    filter.end_date.as_deref(),
234                    filter.user_identity.as_deref(),
235                    filter.source_id.as_deref(),
236                )
237                .await?;
238            let actions = require!(resp.actions).clone();
239            let total = *require!(resp.total);
240            Ok(HistoryPage { actions, total })
241        })
242    };
243    HistoryPaginator::from_fetcher(fetch, page_size)
244}
245
246#[cfg(test)]
247mod tests {
248    use super::*;
249
250    /// Build an `ActionEntity` with a given id for identity in tests.
251    fn make_action(id: i32) -> ActionEntity {
252        ActionEntity {
253            id: Some(id),
254            ..ActionEntity::default()
255        }
256    }
257
258    /// Build a fake fetcher that serves up to `total` actions with ids
259    /// `0..total`, honoring `offset` and `count` arguments. Records
260    /// the number of invocations in the returned `Arc<AtomicUsize>`.
261    fn fake_fetcher(
262        total: i32,
263    ) -> (
264        impl FnMut(u32, u32) -> BoxedFetchFuture<'static>,
265        std::sync::Arc<std::sync::atomic::AtomicUsize>,
266    ) {
267        let calls = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
268        let calls_clone = std::sync::Arc::clone(&calls);
269        let fetch = move |offset: u32, count: u32| {
270            calls_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
271            let start = offset as i32;
272            let end = core::cmp::min(start.saturating_add(count as i32), total);
273            let actions: Vec<ActionEntity> = if start >= total {
274                Vec::new()
275            } else {
276                (start..end).map(make_action).collect()
277            };
278            let page = HistoryPage { actions, total };
279            Box::pin(async move { Ok(page) })
280                as std::pin::Pin<
281                    Box<dyn core::future::Future<Output = Result<HistoryPage, NifiError>> + Send>,
282                >
283        };
284        (fetch, calls)
285    }
286
287    #[tokio::test]
288    async fn next_page_walks_all_pages_then_returns_none() {
289        let (fetch, calls) = fake_fetcher(250);
290        let mut pag = HistoryPaginator::from_fetcher(fetch, 100);
291
292        let p1 = pag.next_page().await.unwrap().unwrap();
293        assert_eq!(p1.len(), 100);
294        let p2 = pag.next_page().await.unwrap().unwrap();
295        assert_eq!(p2.len(), 100);
296        let p3 = pag.next_page().await.unwrap().unwrap();
297        assert_eq!(p3.len(), 50);
298        assert!(pag.next_page().await.unwrap().is_none());
299        assert_eq!(calls.load(std::sync::atomic::Ordering::SeqCst), 3);
300    }
301
302    #[tokio::test]
303    async fn next_page_short_page_terminates() {
304        let (fetch, calls) = fake_fetcher(150);
305        let mut pag = HistoryPaginator::from_fetcher(fetch, 100);
306
307        let p1 = pag.next_page().await.unwrap().unwrap();
308        assert_eq!(p1.len(), 100);
309        let p2 = pag.next_page().await.unwrap().unwrap();
310        assert_eq!(p2.len(), 50);
311        assert!(pag.next_page().await.unwrap().is_none());
312        assert_eq!(calls.load(std::sync::atomic::Ordering::SeqCst), 2);
313    }
314
315    #[tokio::test]
316    async fn next_page_empty_first_response_returns_none() {
317        let (fetch, calls) = fake_fetcher(0);
318        let mut pag = HistoryPaginator::from_fetcher(fetch, 100);
319
320        assert!(pag.next_page().await.unwrap().is_none());
321        assert!(pag.next_page().await.unwrap().is_none());
322        // Second call after exhaustion must not re-invoke the fetcher.
323        assert_eq!(calls.load(std::sync::atomic::Ordering::SeqCst), 1);
324    }
325
326    #[tokio::test]
327    async fn next_page_is_idempotent_after_exhaustion() {
328        let (fetch, calls) = fake_fetcher(50);
329        let mut pag = HistoryPaginator::from_fetcher(fetch, 100);
330
331        let p1 = pag.next_page().await.unwrap().unwrap();
332        assert_eq!(p1.len(), 50);
333        assert!(pag.next_page().await.unwrap().is_none());
334        assert!(pag.next_page().await.unwrap().is_none());
335        assert!(pag.next_page().await.unwrap().is_none());
336        assert_eq!(
337            calls.load(std::sync::atomic::Ordering::SeqCst),
338            1,
339            "fetcher must not be called after exhaustion"
340        );
341    }
342
343    #[tokio::test]
344    async fn next_page_does_not_advance_on_error() {
345        // Fake fetcher that fails on call 2, then recovers.
346        let calls = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
347        let calls_clone = std::sync::Arc::clone(&calls);
348        let fetch = move |offset: u32, count: u32| {
349            let n = calls_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
350            let actions: Vec<ActionEntity> = (offset..offset + count)
351                .map(|i| make_action(i as i32))
352                .collect();
353            let fail = n == 1;
354            Box::pin(async move {
355                if fail {
356                    Err(NifiError::Unauthorized {
357                        message: "simulated".to_string(),
358                    })
359                } else {
360                    Ok(HistoryPage {
361                        actions,
362                        total: 300,
363                    })
364                }
365            })
366                as std::pin::Pin<
367                    Box<dyn core::future::Future<Output = Result<HistoryPage, NifiError>> + Send>,
368                >
369        };
370        let mut pag = HistoryPaginator::from_fetcher(fetch, 100);
371
372        let p1 = pag.next_page().await.unwrap().unwrap();
373        assert_eq!(p1.len(), 100);
374        assert!(pag.next_page().await.is_err());
375        // After the error, offset must still be 100 (not 200).
376        let p2 = pag.next_page().await.unwrap().unwrap();
377        assert_eq!(p2.first().and_then(|a| a.id), Some(100));
378    }
379
380    #[tokio::test]
381    async fn next_page_offset_overflow_saturates() {
382        // Simulate `total = i32::MAX` with a fetcher that always returns
383        // a full page. The paginator must eventually terminate via
384        // saturation of the offset + i64-widened comparison.
385        let calls = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
386        let calls_clone = std::sync::Arc::clone(&calls);
387        let count = 100_000_u32;
388        let fetch = move |offset: u32, _count: u32| {
389            calls_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
390            let actions: Vec<ActionEntity> = (0..count)
391                .map(|i| make_action((offset as i32).wrapping_add(i as i32)))
392                .collect();
393            Box::pin(async move {
394                Ok(HistoryPage {
395                    actions,
396                    total: i32::MAX,
397                })
398            })
399                as std::pin::Pin<
400                    Box<dyn core::future::Future<Output = Result<HistoryPage, NifiError>> + Send>,
401                >
402        };
403        let mut pag = HistoryPaginator::from_fetcher(fetch, count);
404        // Walk a bounded number of pages; the paginator must terminate
405        // naturally via offset >= total (i64 comparison). Guard with a
406        // hard cap so an infinite loop bug fails the test quickly.
407        let mut pages = 0_usize;
408        while pag.next_page().await.unwrap().is_some() {
409            pages += 1;
410            assert!(pages < 25_000, "paginator failed to terminate");
411        }
412        // Not asserting an exact page count — only that it terminated.
413    }
414
415    #[tokio::test]
416    async fn item_next_buffers_pages_and_yields_all() {
417        let (fetch, calls) = fake_fetcher(5);
418        let mut pag = HistoryPaginator::from_fetcher(fetch, 2);
419
420        let mut ids = Vec::new();
421        while let Some(action) = pag.next().await.unwrap() {
422            ids.push(action.id.unwrap());
423        }
424        assert_eq!(ids, vec![0, 1, 2, 3, 4]);
425        // 5 items at page_size=2 → pages of 2/2/1 → 3 fetches.
426        assert_eq!(calls.load(std::sync::atomic::Ordering::SeqCst), 3);
427    }
428}