use std::{
mem,
ops::{Deref, DerefMut},
sync::Arc,
};
use futures::{
StreamExt,
stream::{BoxStream, unfold},
};
use serde::{Deserialize, Serialize, de::DeserializeOwned, ser::Serializer};
use crate::{
Result,
error::AnytypeError,
http_client::{HttpClient, HttpRequest},
};
#[derive(Debug)]
pub struct PagedResult<T> {
response: PaginatedResponse<T>,
refill: Option<Refill>,
}
#[derive(Debug, Clone)]
struct Refill {
client: Arc<HttpClient>,
request: HttpRequest,
}
impl<T> PagedResult<T> {
pub(crate) fn new(
response: PaginatedResponse<T>,
client: Arc<HttpClient>,
request: HttpRequest,
) -> Self {
Self {
response,
refill: Some(Refill { client, request }),
}
}
fn single_page(response: PaginatedResponse<T>) -> Self {
Self {
response,
refill: None,
}
}
pub(crate) fn from_items(items: Vec<T>) -> Self {
let total = items.len();
let response = PaginatedResponse {
items,
pagination: PaginationMeta {
has_more: false,
#[allow(clippy::cast_possible_truncation)]
limit: (total & (u32::MAX as usize)) as u32,
offset: 0,
total,
},
};
Self::single_page(response)
}
#[allow(dead_code)]
pub(crate) fn from_response(response: PaginatedResponse<T>) -> Self {
Self::single_page(response)
}
pub fn into_response(self) -> PaginatedResponse<T> {
self.response
}
}
impl<T> Deref for PagedResult<T> {
type Target = PaginatedResponse<T>;
fn deref(&self) -> &Self::Target {
&self.response
}
}
impl<T> DerefMut for PagedResult<T> {
fn deref_mut(&mut self) -> &mut PaginatedResponse<T> {
&mut self.response
}
}
impl<T: Serialize> Serialize for PagedResult<T> {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: Serializer,
{
self.response.serialize(serializer)
}
}
#[allow(clippy::type_complexity)]
fn next_response_iter<T>(
next_response: PaginatedResponse<T>,
limit: u32,
refill: Refill,
) -> Option<(
Result<T, AnytypeError>,
(std::vec::IntoIter<T>, bool, u32, u32, Option<Refill>, bool),
)> {
let new_has_more = next_response.pagination.has_more;
let new_offset = next_response.pagination.offset + next_response.pagination.limit;
let mut new_items = next_response.items.into_iter();
new_items.next().map(|item: T| {
(
Ok::<T, AnytypeError>(item),
(
new_items,
new_has_more,
new_offset,
limit,
Some(refill),
false,
),
)
})
}
impl<T: DeserializeOwned + Send + 'static> PagedResult<T> {
pub fn into_stream(self) -> BoxStream<'static, Result<T>> {
let response = self.response;
let refill = self.refill;
let current_items = response.items.into_iter();
let has_more = response.pagination.has_more;
let offset = response.pagination.offset + response.pagination.limit;
let limit = response.pagination.limit;
unfold(
(current_items, has_more, offset, limit, refill, false),
move |(mut items, has_more, offset, limit, refill, mut errored)| async move {
if errored {
return None;
}
if let Some(item) = items.next() {
return Some((
Ok(item),
(items, has_more, offset, limit, refill.clone(), false),
));
}
if has_more && let Some(refill) = refill {
let next_request = refill.request.with_pagination(offset, limit);
match refill
.client
.send::<PaginatedResponse<T>>(next_request.clone())
.await
{
Ok(next_response) => next_response_iter(
next_response,
limit,
Refill {
client: refill.client.clone(),
request: next_request,
},
),
Err(err) => {
errored = true;
Some((
Err(err),
(items, false, offset, limit, Some(refill), errored),
))
}
}
} else {
None
}
},
)
.boxed()
}
pub async fn collect_all(self) -> Result<Vec<T>> {
let mut stream = self.into_stream();
let mut items = Vec::new();
while let Some(result) = stream.next().await {
items.push(result?);
}
Ok(items)
}
}
impl<'a, T> IntoIterator for &'a PagedResult<T> {
type Item = &'a T;
type IntoIter = std::slice::Iter<'a, T>;
fn into_iter(self) -> Self::IntoIter {
self.response.items.iter()
}
}
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct PaginationResponse {
pub has_more: bool,
pub limit: usize,
pub offset: usize,
pub total: usize,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct PaginatedResponse<T> {
#[serde(default = "Vec::new", alias = "data")]
pub items: Vec<T>,
pub pagination: PaginationMeta,
}
impl<T> PaginatedResponse<T> {
#[must_use]
pub fn len(&self) -> usize {
self.items.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.items.is_empty()
}
pub fn iter_mut(&mut self) -> std::slice::IterMut<'_, T> {
self.items.iter_mut()
}
pub fn take_items(&mut self) -> Vec<T> {
mem::take(&mut self.items)
}
}
impl<'a, T> PaginatedResponse<T> {
pub fn iter(&'a self) -> std::slice::Iter<'a, T> {
self.items.iter()
}
}
impl<'a, T> IntoIterator for &'a PaginatedResponse<T> {
type Item = &'a T;
type IntoIter = std::slice::Iter<'a, T>;
fn into_iter(self) -> Self::IntoIter {
self.items.iter()
}
}
impl<'a, T> IntoIterator for &'a mut PaginatedResponse<T> {
type Item = &'a T;
type IntoIter = std::slice::Iter<'a, T>;
fn into_iter(self) -> Self::IntoIter {
self.items.iter()
}
}
#[derive(Debug, Deserialize, Serialize)]
pub struct PaginationMeta {
pub has_more: bool,
pub limit: u32,
pub offset: u32,
pub total: usize,
}
#[cfg(test)]
mod tests {
use serde::{Deserialize, Serialize};
use crate::{objects::Object, paged::*};
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
struct TestItem {
id: String,
name: String,
}
fn create_test_request() -> HttpRequest {
HttpRequest {
method: reqwest::Method::GET,
path: "/test".to_string(),
query: vec![
("limit".to_string(), "10".to_string()),
("offset".to_string(), "0".to_string()),
],
body: None,
}
}
#[test]
fn test_deref_to_paginated_response() {
let items = vec![
TestItem {
id: "1".to_string(),
name: "Item 1".to_string(),
},
TestItem {
id: "2".to_string(),
name: "Item 2".to_string(),
},
];
let paged = PagedResult::from_items(items);
assert_eq!(paged.items.len(), 2);
assert_eq!(paged.items[0].id, "1");
assert_eq!(paged.items[1].name, "Item 2");
assert_eq!(paged.pagination.total, 2);
assert!(!paged.pagination.has_more);
assert_eq!(paged.pagination.offset, 0);
}
#[test]
fn test_deref_len_and_is_empty() {
let items = vec![TestItem {
id: "1".to_string(),
name: "Item 1".to_string(),
}];
let paged = PagedResult::from_items(items);
assert_eq!(paged.len(), 1);
assert!(!paged.is_empty());
let empty_paged = PagedResult::<Object>::from_items(vec![]);
assert_eq!(empty_paged.len(), 0);
assert!(empty_paged.is_empty());
}
#[test]
fn test_debug_implementation() {
let items = vec![TestItem {
id: "1".to_string(),
name: "Test".to_string(),
}];
let paged = PagedResult::from_items(items);
let debug_str = format!("{paged:?}");
assert!(debug_str.contains("PagedResult"));
assert!(debug_str.contains("response"));
assert!(debug_str.contains("TestItem"));
}
#[test]
fn test_serialize_implementation() {
let items = vec![
TestItem {
id: "1".to_string(),
name: "Item 1".to_string(),
},
TestItem {
id: "2".to_string(),
name: "Item 2".to_string(),
},
];
let paged = PagedResult::from_items(items);
let json = serde_json::to_string(&paged).expect("Failed to serialize");
let parsed: serde_json::Value = serde_json::from_str(&json).expect("Failed to parse JSON");
assert!(parsed["items"].is_array());
assert_eq!(parsed["items"].as_array().unwrap().len(), 2);
assert_eq!(parsed["items"][0]["id"], "1");
assert_eq!(parsed["items"][1]["name"], "Item 2");
assert_eq!(parsed["pagination"]["total"], 2);
assert_eq!(parsed["pagination"]["has_more"], false);
}
#[test]
fn test_into_iterator_for_reference() {
let items = vec![
TestItem {
id: "1".to_string(),
name: "Item 1".to_string(),
},
TestItem {
id: "2".to_string(),
name: "Item 2".to_string(),
},
TestItem {
id: "3".to_string(),
name: "Item 3".to_string(),
},
];
let paged = PagedResult::from_items(items);
let mut collected: Vec<&TestItem> = Vec::new();
for item in &paged {
collected.push(item);
}
assert_eq!(collected.len(), 3);
assert_eq!(collected[0].id, "1");
assert_eq!(collected[1].id, "2");
assert_eq!(collected[2].id, "3");
}
#[test]
fn test_iter_method() {
let items = vec![
TestItem {
id: "1".to_string(),
name: "Item 1".to_string(),
},
TestItem {
id: "2".to_string(),
name: "Item 2".to_string(),
},
];
let paged = PagedResult::from_items(items);
let names: Vec<&str> = paged.iter().map(|item| item.name.as_str()).collect();
assert_eq!(names, vec!["Item 1", "Item 2"]);
}
#[test]
fn test_http_request_with_pagination() {
let request = create_test_request();
let new_request = request.with_pagination(20, 15);
let offset = new_request.query.iter().find(|(key, _)| key == "offset");
let limit = new_request.query.iter().find(|(key, _)| key == "limit");
assert_eq!(offset, Some(&("offset".to_string(), "20".to_string())));
assert_eq!(limit, Some(&("limit".to_string(), "15".to_string())));
}
#[test]
fn test_http_request_with_pagination_replaces_existing() {
let mut request = create_test_request();
request
.query
.push(("filter".to_string(), "active".to_string()));
let new_request = request.with_pagination(30, 25);
let filter = new_request.query.iter().find(|(key, _)| key == "filter");
assert_eq!(filter, Some(&("filter".to_string(), "active".to_string())));
let offsets: Vec<_> = new_request
.query
.iter()
.filter(|(key, _)| key == "offset")
.collect();
let limits: Vec<_> = new_request
.query
.iter()
.filter(|(key, _)| key == "limit")
.collect();
assert_eq!(offsets.len(), 1);
assert_eq!(limits.len(), 1);
assert_eq!(offsets[0].1, "30");
assert_eq!(limits[0].1, "25");
}
#[tokio::test]
async fn test_into_stream_single_page() {
let items = vec![
TestItem {
id: "1".to_string(),
name: "Item 1".to_string(),
},
TestItem {
id: "2".to_string(),
name: "Item 2".to_string(),
},
];
let paged = PagedResult::from_items(items.clone());
let mut stream = paged.into_stream();
let mut collected = Vec::new();
while let Some(result) = stream.next().await {
collected.push(result.expect("Expected Ok item"));
}
assert_eq!(collected.len(), 2);
assert_eq!(collected[0].id, "1");
assert_eq!(collected[1].id, "2");
}
#[tokio::test]
async fn test_into_stream_empty_page() {
let paged = PagedResult::<Object>::from_items(vec![]);
let mut stream = paged.into_stream();
let mut count = 0;
while let Some(result) = stream.next().await {
result.expect("Expected Ok item");
count += 1;
}
assert_eq!(count, 0);
}
#[tokio::test]
async fn test_collect_all_single_page() {
let items = vec![
TestItem {
id: "1".to_string(),
name: "Item 1".to_string(),
},
TestItem {
id: "2".to_string(),
name: "Item 2".to_string(),
},
TestItem {
id: "3".to_string(),
name: "Item 3".to_string(),
},
];
let paged = PagedResult::from_items(items.clone());
let all_items = paged
.collect_all()
.await
.expect("collect_all should succeed");
assert_eq!(all_items.len(), 3);
assert_eq!(all_items[0].id, "1");
assert_eq!(all_items[1].id, "2");
assert_eq!(all_items[2].id, "3");
}
#[tokio::test]
async fn test_collect_all_empty() {
let paged = PagedResult::<Object>::from_items(vec![]);
let all_items = paged
.collect_all()
.await
.expect("collect_all should succeed");
assert!(all_items.is_empty());
}
}