use crate::cache::models::{CachedSpec, PaginationStrategy};
use crate::constants;
use crate::engine::executor;
use crate::error::Error;
use crate::invocation::{ExecutionContext, ExecutionResult, OperationCall};
use serde_json::Value;
use std::collections::HashMap;
const MAX_PAGES: usize = 1000;
const DATA_ARRAY_FIELDS: &[&str] = &["data", "items", "results", "entries", "records", "content"];
#[allow(clippy::too_many_lines)]
pub async fn execute_paginated(
spec: &CachedSpec,
mut call: OperationCall,
ctx: ExecutionContext,
writer: &mut impl std::io::Write,
) -> Result<u64, Error> {
let operation = spec
.commands
.iter()
.find(|c| c.operation_id == call.operation_id)
.ok_or_else(|| Error::operation_not_found(&call.operation_id))?;
let strategy = operation.pagination.strategy;
if matches!(strategy, PaginationStrategy::None) {
tracing::warn!(
operation_id = %call.operation_id,
"No pagination metadata detected for this operation; executing once. \
Consider adding x-aperture-pagination to the spec."
);
}
let cursor_field = operation.pagination.cursor_field.clone();
let cursor_param = operation
.pagination
.cursor_param
.clone()
.or_else(|| cursor_field.clone());
let page_param = operation
.pagination
.page_param
.clone()
.unwrap_or_else(|| detect_page_param(&call.query_params));
let limit_param = operation
.pagination
.limit_param
.clone()
.unwrap_or_else(|| detect_limit_param(&call.query_params));
let limit: usize = call
.query_params
.get(&limit_param)
.and_then(|v| v.parse().ok())
.unwrap_or(20);
let mut total_items: u64 = 0;
for _page_num in 0..MAX_PAGES {
let result = executor::execute(spec, call.clone(), ctx.clone()).await?;
let (body, response_headers) = match result {
ExecutionResult::Success { body, headers, .. } => (body, headers),
ExecutionResult::Cached { body } => (body, HashMap::new()),
ExecutionResult::DryRun { request_info } => {
let line = serde_json::to_string(&request_info).map_err(|e| {
Error::serialization_error(format!("Failed to serialize dry-run info: {e}"))
})?;
writeln!(writer, "{line}")
.map_err(|e| Error::io_error(format!("Failed to write output: {e}")))?;
break;
}
ExecutionResult::Empty => break,
};
let json: Value = serde_json::from_str(&body).map_err(|e| {
Error::invalid_json_body(format!("Page response is not valid JSON: {e}"))
})?;
let items = extract_items(&json);
let page_len = items.len();
for item in items {
let line = serde_json::to_string(item).map_err(|e| {
Error::serialization_error(format!("Failed to serialize item: {e}"))
})?;
writeln!(writer, "{line}")
.map_err(|e| Error::io_error(format!("Failed to write output: {e}")))?;
total_items += 1;
}
let has_next = advance_cursor(
strategy,
&mut call,
&json,
&response_headers,
cursor_field.as_ref(),
cursor_param.as_ref(),
&page_param,
page_len,
limit,
);
if !has_next {
break;
}
}
Ok(total_items)
}
#[allow(clippy::too_many_arguments)]
fn advance_cursor(
strategy: PaginationStrategy,
call: &mut OperationCall,
json: &Value,
response_headers: &HashMap<String, String>,
cursor_field: Option<&String>,
cursor_param: Option<&String>,
page_param: &str,
page_len: usize,
limit: usize,
) -> bool {
match strategy {
PaginationStrategy::None => false,
PaginationStrategy::Cursor => {
advance_cursor_strategy(call, json, cursor_field, cursor_param)
}
PaginationStrategy::Offset => advance_offset_strategy(call, page_param, page_len, limit),
PaginationStrategy::LinkHeader => advance_link_header_strategy(call, response_headers),
}
}
fn advance_cursor_strategy(
call: &mut OperationCall,
json: &Value,
cursor_field: Option<&String>,
cursor_param: Option<&String>,
) -> bool {
let field = cursor_field.map_or("next_cursor", String::as_str);
let param = cursor_param.map_or(field, String::as_str);
match extract_cursor_value(json, field) {
Some(c) if !c.is_empty() => {
call.query_params.insert(param.to_string(), c);
true
}
_ => false,
}
}
fn advance_offset_strategy(
call: &mut OperationCall,
page_param: &str,
page_len: usize,
limit: usize,
) -> bool {
if page_len == 0 || page_len < limit {
return false;
}
let is_record_offset = page_param == "offset" || page_param == "skip";
let next_value = if is_record_offset {
let current: usize = call
.query_params
.get(page_param)
.and_then(|v| v.parse().ok())
.unwrap_or(0);
current + page_len
} else {
let current: usize = call
.query_params
.get(page_param)
.and_then(|v| v.parse().ok())
.unwrap_or(1);
current + 1
};
call.query_params
.insert(page_param.to_string(), next_value.to_string());
true
}
fn advance_link_header_strategy(
call: &mut OperationCall,
response_headers: &HashMap<String, String>,
) -> bool {
let link_value = response_headers
.iter()
.find(|(k, _)| k.to_lowercase() == constants::HEADER_LINK)
.map_or("", |(_, v)| v.as_str());
parse_link_next(link_value).is_some_and(|next_url| apply_next_url(call, &next_url))
}
fn extract_items(json: &Value) -> Vec<&Value> {
match json {
Value::Array(arr) => arr.iter().collect(),
Value::Object(_) => {
for field in DATA_ARRAY_FIELDS {
if let Some(Value::Array(arr)) = json.get(*field) {
return arr.iter().collect();
}
}
std::slice::from_ref(json).iter().collect()
}
_ => vec![],
}
}
fn extract_cursor_value(json: &Value, field: &str) -> Option<String> {
let mut current = json;
for part in field.split('.') {
current = current.get(part)?;
}
match current {
Value::String(s) if !s.is_empty() => Some(s.clone()),
Value::Number(n) => Some(n.to_string()),
_ => None,
}
}
#[must_use]
pub fn parse_link_next(header_value: &str) -> Option<String> {
for part in header_value.split(',') {
let part = part.trim();
let Some(url_end) = part.find('>') else {
continue;
};
if !part.starts_with('<') {
continue;
}
let url = &part[1..url_end];
let rest = &part[url_end + 1..];
if rest.split(';').any(|seg| {
let seg = seg.trim().to_lowercase();
seg == r#"rel="next""# || seg == "rel=next"
}) {
return Some(url.to_string());
}
}
None
}
fn apply_next_url(call: &mut OperationCall, next_url: &str) -> bool {
let query_str = if let Some(pos) = next_url.find('?') {
&next_url[pos + 1..]
} else {
tracing::warn!(
next_url,
"Link next URL has no query string; stopping pagination"
);
return false;
};
let new_params: HashMap<String, String> = query_str
.split('&')
.filter_map(|pair| {
let mut parts = pair.splitn(2, '=');
let key = parts.next().filter(|k| !k.is_empty())?;
let val = parts.next().unwrap_or("");
Some((
urlencoding::decode(key).unwrap_or_default().into_owned(),
urlencoding::decode(val).unwrap_or_default().into_owned(),
))
})
.collect();
if new_params.is_empty() {
return false;
}
call.query_params = new_params;
true
}
fn detect_page_param(params: &HashMap<String, String>) -> String {
constants::PAGINATION_PAGE_PARAMS
.iter()
.find(|&&p| params.contains_key(p))
.map_or("page", |&p| p)
.to_string()
}
fn detect_limit_param(params: &HashMap<String, String>) -> String {
constants::PAGINATION_LIMIT_PARAMS
.iter()
.find(|&&p| params.contains_key(p))
.map_or("limit", |&p| p)
.to_string()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_link_next_returns_next_url() {
let header = r#"<https://api.example.com/items?page=2>; rel="next", <https://api.example.com/items?page=10>; rel="last""#;
assert_eq!(
parse_link_next(header),
Some("https://api.example.com/items?page=2".to_string())
);
}
#[test]
fn test_parse_link_next_without_next_returns_none() {
let header = r#"<https://api.example.com/items?page=10>; rel="last""#;
assert_eq!(parse_link_next(header), None);
}
#[test]
fn test_parse_link_next_without_quotes() {
let header = "<https://api.example.com/items?page=2>; rel=next";
assert_eq!(
parse_link_next(header),
Some("https://api.example.com/items?page=2".to_string())
);
}
#[test]
fn test_parse_link_next_empty_returns_none() {
assert_eq!(parse_link_next(""), None);
}
#[test]
fn test_extract_cursor_value_simple_field() {
let json = serde_json::json!({"next_cursor": "abc123", "data": []});
assert_eq!(
extract_cursor_value(&json, "next_cursor"),
Some("abc123".to_string())
);
}
#[test]
fn test_extract_cursor_value_dotted_path() {
let json = serde_json::json!({"page": {"next_cursor": "tok_xyz"}});
assert_eq!(
extract_cursor_value(&json, "page.next_cursor"),
Some("tok_xyz".to_string())
);
}
#[test]
fn test_extract_cursor_value_null_returns_none() {
let json = serde_json::json!({"next_cursor": null});
assert_eq!(extract_cursor_value(&json, "next_cursor"), None);
}
#[test]
fn test_extract_cursor_value_empty_string_returns_none() {
let json = serde_json::json!({"next_cursor": ""});
assert_eq!(extract_cursor_value(&json, "next_cursor"), None);
}
#[test]
fn test_extract_items_from_top_level_array() {
let json = serde_json::json!([{"id": 1}, {"id": 2}]);
assert_eq!(extract_items(&json).len(), 2);
}
#[test]
fn test_extract_items_from_data_wrapper() {
let json = serde_json::json!({"data": [{"id": 1}], "total": 1});
assert_eq!(extract_items(&json).len(), 1);
}
#[test]
fn test_extract_items_from_items_wrapper() {
let json = serde_json::json!({"items": [{"id": 1}, {"id": 2}], "next_cursor": "abc"});
assert_eq!(extract_items(&json).len(), 2);
}
#[test]
fn test_extract_items_single_object_fallback() {
let json = serde_json::json!({"id": 1, "name": "Alice"});
assert_eq!(extract_items(&json).len(), 1);
}
#[test]
fn test_extract_items_empty_array() {
let json = serde_json::json!([]);
assert_eq!(extract_items(&json).len(), 0);
}
#[test]
fn test_advance_offset_strategy_increments_page_number() {
let mut call = crate::invocation::OperationCall {
operation_id: "op".to_string(),
path_params: HashMap::new(),
query_params: HashMap::from([("page".to_string(), "1".to_string())]),
header_params: HashMap::new(),
body: None,
custom_headers: vec![],
};
let has_next = advance_offset_strategy(&mut call, "page", 10, 10);
assert!(has_next);
assert_eq!(call.query_params["page"], "2");
}
#[test]
fn test_advance_offset_strategy_stops_on_partial_page() {
let mut call = crate::invocation::OperationCall {
operation_id: "op".to_string(),
path_params: HashMap::new(),
query_params: HashMap::from([("page".to_string(), "1".to_string())]),
header_params: HashMap::new(),
body: None,
custom_headers: vec![],
};
let has_next = advance_offset_strategy(&mut call, "page", 3, 10);
assert!(!has_next, "partial page should return false");
}
#[test]
fn test_advance_offset_strategy_skip_advances_by_page_len() {
let mut call = crate::invocation::OperationCall {
operation_id: "op".to_string(),
path_params: HashMap::new(),
query_params: HashMap::from([("skip".to_string(), "0".to_string())]),
header_params: HashMap::new(),
body: None,
custom_headers: vec![],
};
let has_next = advance_offset_strategy(&mut call, "skip", 10, 10);
assert!(has_next);
assert_eq!(call.query_params["skip"], "10");
let has_next = advance_offset_strategy(&mut call, "skip", 10, 10);
assert!(has_next);
assert_eq!(call.query_params["skip"], "20");
}
#[test]
fn test_advance_offset_strategy_offset_advances_by_page_len() {
let mut call = crate::invocation::OperationCall {
operation_id: "op".to_string(),
path_params: HashMap::new(),
query_params: HashMap::from([("offset".to_string(), "0".to_string())]),
header_params: HashMap::new(),
body: None,
custom_headers: vec![],
};
let has_next = advance_offset_strategy(&mut call, "offset", 5, 5);
assert!(has_next);
assert_eq!(call.query_params["offset"], "5");
}
}