use std::collections::VecDeque;
#[cfg(feature = "dynamic")]
use crate::dynamic::types::ActionEntity;
use crate::error::NifiError;
#[cfg(not(feature = "dynamic"))]
use crate::types::ActionEntity;
type BoxedFetchFuture<'a> = std::pin::Pin<
Box<dyn core::future::Future<Output = Result<HistoryPage, NifiError>> + Send + 'a>,
>;
#[derive(Default, Debug, Clone)]
pub struct HistoryFilter {
pub sort_column: Option<String>,
pub sort_order: Option<String>,
pub start_date: Option<String>,
pub end_date: Option<String>,
pub user_identity: Option<String>,
pub source_id: Option<String>,
}
#[derive(Debug, Clone)]
pub struct HistoryPage {
pub actions: Vec<ActionEntity>,
pub total: i32,
}
pub struct HistoryPaginator<F> {
fetch: F,
page_size: u32,
offset: u32,
buffer: VecDeque<ActionEntity>,
exhausted: bool,
}
impl<F, Fut> HistoryPaginator<F>
where
F: FnMut(u32, u32) -> Fut,
Fut: core::future::Future<Output = Result<HistoryPage, NifiError>>,
{
pub fn from_fetcher(fetch: F, page_size: u32) -> Self {
Self {
fetch,
page_size,
offset: 0,
buffer: VecDeque::new(),
exhausted: false,
}
}
pub async fn next_page(&mut self) -> Result<Option<Vec<ActionEntity>>, NifiError> {
if self.exhausted {
return Ok(None);
}
let page = (self.fetch)(self.offset, self.page_size).await?;
let returned = page.actions.len() as u32;
self.offset = self.offset.saturating_add(returned);
if returned == 0
|| returned < self.page_size
|| i64::from(self.offset) >= i64::from(page.total)
{
self.exhausted = true;
}
if page.actions.is_empty() {
Ok(None)
} else {
Ok(Some(page.actions))
}
}
pub async fn next(&mut self) -> Result<Option<ActionEntity>, NifiError> {
loop {
if let Some(item) = self.buffer.pop_front() {
return Ok(Some(item));
}
match self.next_page().await? {
Some(page) => self.buffer.extend(page),
None => return Ok(None),
}
}
}
}
#[cfg(not(feature = "dynamic"))]
pub fn flow_history<'a>(
client: &'a crate::NifiClient,
filter: HistoryFilter,
page_size: u32,
) -> HistoryPaginator<impl FnMut(u32, u32) -> BoxedFetchFuture<'a> + 'a> {
use crate::require;
let fetch = move |offset: u32, count: u32| -> BoxedFetchFuture<'a> {
let filter = filter.clone();
Box::pin(async move {
let offset_s = offset.to_string();
let count_s = count.to_string();
let resp = client
.flow()
.query_history(
&offset_s,
&count_s,
filter.sort_column.as_deref(),
filter.sort_order.as_deref(),
filter.start_date.as_deref(),
filter.end_date.as_deref(),
filter.user_identity.as_deref(),
filter.source_id.as_deref(),
)
.await?;
let actions = require!(resp.actions).clone();
let total = *require!(resp.total);
Ok(HistoryPage { actions, total })
})
};
HistoryPaginator::from_fetcher(fetch, page_size)
}
#[cfg(feature = "dynamic")]
pub fn flow_history_dynamic<'a>(
client: &'a crate::dynamic::DynamicClient,
filter: HistoryFilter,
page_size: u32,
) -> HistoryPaginator<impl FnMut(u32, u32) -> BoxedFetchFuture<'a> + 'a> {
use crate::require;
let fetch = move |offset: u32, count: u32| -> BoxedFetchFuture<'a> {
let filter = filter.clone();
Box::pin(async move {
let offset_s = offset.to_string();
let count_s = count.to_string();
let resp = client
.flow()
.query_history(
&offset_s,
&count_s,
filter.sort_column.as_deref(),
filter.sort_order.as_deref(),
filter.start_date.as_deref(),
filter.end_date.as_deref(),
filter.user_identity.as_deref(),
filter.source_id.as_deref(),
)
.await?;
let actions = require!(resp.actions).clone();
let total = *require!(resp.total);
Ok(HistoryPage { actions, total })
})
};
HistoryPaginator::from_fetcher(fetch, page_size)
}
#[cfg(test)]
mod tests {
use super::*;
fn make_action(id: i32) -> ActionEntity {
ActionEntity {
id: Some(id),
..ActionEntity::default()
}
}
fn fake_fetcher(
total: i32,
) -> (
impl FnMut(u32, u32) -> BoxedFetchFuture<'static>,
std::sync::Arc<std::sync::atomic::AtomicUsize>,
) {
let calls = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
let calls_clone = std::sync::Arc::clone(&calls);
let fetch = move |offset: u32, count: u32| {
calls_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let start = offset as i32;
let end = core::cmp::min(start.saturating_add(count as i32), total);
let actions: Vec<ActionEntity> = if start >= total {
Vec::new()
} else {
(start..end).map(make_action).collect()
};
let page = HistoryPage { actions, total };
Box::pin(async move { Ok(page) })
as std::pin::Pin<
Box<dyn core::future::Future<Output = Result<HistoryPage, NifiError>> + Send>,
>
};
(fetch, calls)
}
#[tokio::test]
async fn next_page_walks_all_pages_then_returns_none() {
let (fetch, calls) = fake_fetcher(250);
let mut pag = HistoryPaginator::from_fetcher(fetch, 100);
let p1 = pag.next_page().await.unwrap().unwrap();
assert_eq!(p1.len(), 100);
let p2 = pag.next_page().await.unwrap().unwrap();
assert_eq!(p2.len(), 100);
let p3 = pag.next_page().await.unwrap().unwrap();
assert_eq!(p3.len(), 50);
assert!(pag.next_page().await.unwrap().is_none());
assert_eq!(calls.load(std::sync::atomic::Ordering::SeqCst), 3);
}
#[tokio::test]
async fn next_page_short_page_terminates() {
let (fetch, calls) = fake_fetcher(150);
let mut pag = HistoryPaginator::from_fetcher(fetch, 100);
let p1 = pag.next_page().await.unwrap().unwrap();
assert_eq!(p1.len(), 100);
let p2 = pag.next_page().await.unwrap().unwrap();
assert_eq!(p2.len(), 50);
assert!(pag.next_page().await.unwrap().is_none());
assert_eq!(calls.load(std::sync::atomic::Ordering::SeqCst), 2);
}
#[tokio::test]
async fn next_page_empty_first_response_returns_none() {
let (fetch, calls) = fake_fetcher(0);
let mut pag = HistoryPaginator::from_fetcher(fetch, 100);
assert!(pag.next_page().await.unwrap().is_none());
assert!(pag.next_page().await.unwrap().is_none());
assert_eq!(calls.load(std::sync::atomic::Ordering::SeqCst), 1);
}
#[tokio::test]
async fn next_page_is_idempotent_after_exhaustion() {
let (fetch, calls) = fake_fetcher(50);
let mut pag = HistoryPaginator::from_fetcher(fetch, 100);
let p1 = pag.next_page().await.unwrap().unwrap();
assert_eq!(p1.len(), 50);
assert!(pag.next_page().await.unwrap().is_none());
assert!(pag.next_page().await.unwrap().is_none());
assert!(pag.next_page().await.unwrap().is_none());
assert_eq!(
calls.load(std::sync::atomic::Ordering::SeqCst),
1,
"fetcher must not be called after exhaustion"
);
}
#[tokio::test]
async fn next_page_does_not_advance_on_error() {
let calls = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
let calls_clone = std::sync::Arc::clone(&calls);
let fetch = move |offset: u32, count: u32| {
let n = calls_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let actions: Vec<ActionEntity> = (offset..offset + count)
.map(|i| make_action(i as i32))
.collect();
let fail = n == 1;
Box::pin(async move {
if fail {
Err(NifiError::Unauthorized {
message: "simulated".to_string(),
})
} else {
Ok(HistoryPage {
actions,
total: 300,
})
}
})
as std::pin::Pin<
Box<dyn core::future::Future<Output = Result<HistoryPage, NifiError>> + Send>,
>
};
let mut pag = HistoryPaginator::from_fetcher(fetch, 100);
let p1 = pag.next_page().await.unwrap().unwrap();
assert_eq!(p1.len(), 100);
assert!(pag.next_page().await.is_err());
let p2 = pag.next_page().await.unwrap().unwrap();
assert_eq!(p2.first().and_then(|a| a.id), Some(100));
}
#[tokio::test]
async fn next_page_offset_overflow_saturates() {
let calls = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
let calls_clone = std::sync::Arc::clone(&calls);
let count = 100_000_u32;
let fetch = move |offset: u32, _count: u32| {
calls_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let actions: Vec<ActionEntity> = (0..count)
.map(|i| make_action((offset as i32).wrapping_add(i as i32)))
.collect();
Box::pin(async move {
Ok(HistoryPage {
actions,
total: i32::MAX,
})
})
as std::pin::Pin<
Box<dyn core::future::Future<Output = Result<HistoryPage, NifiError>> + Send>,
>
};
let mut pag = HistoryPaginator::from_fetcher(fetch, count);
let mut pages = 0_usize;
while pag.next_page().await.unwrap().is_some() {
pages += 1;
assert!(pages < 25_000, "paginator failed to terminate");
}
}
#[tokio::test]
async fn item_next_buffers_pages_and_yields_all() {
let (fetch, calls) = fake_fetcher(5);
let mut pag = HistoryPaginator::from_fetcher(fetch, 2);
let mut ids = Vec::new();
while let Some(action) = pag.next().await.unwrap() {
ids.push(action.id.unwrap());
}
assert_eq!(ids, vec![0, 1, 2, 3, 4]);
assert_eq!(calls.load(std::sync::atomic::Ordering::SeqCst), 3);
}
}