1use crate::cache::models::{CachedSpec, PaginationStrategy};
16use crate::constants;
17use crate::engine::executor;
18use crate::error::Error;
19use crate::invocation::{ExecutionContext, ExecutionResult, OperationCall};
20use serde_json::Value;
21use std::collections::HashMap;
22
23const MAX_PAGES: usize = 1000;
25
26const DATA_ARRAY_FIELDS: &[&str] = &["data", "items", "results", "entries", "records", "content"];
29
30#[allow(clippy::too_many_lines)]
42pub async fn execute_paginated(
43 spec: &CachedSpec,
44 mut call: OperationCall,
45 ctx: ExecutionContext,
46 writer: &mut impl std::io::Write,
47) -> Result<u64, Error> {
48 let operation = spec
49 .commands
50 .iter()
51 .find(|c| c.operation_id == call.operation_id)
52 .ok_or_else(|| Error::operation_not_found(&call.operation_id))?;
53
54 let strategy = operation.pagination.strategy;
55
56 if matches!(strategy, PaginationStrategy::None) {
57 tracing::warn!(
58 operation_id = %call.operation_id,
59 "No pagination metadata detected for this operation; executing once. \
60 Consider adding x-aperture-pagination to the spec."
61 );
62 }
63
64 let cursor_field = operation.pagination.cursor_field.clone();
65 let cursor_param = operation
66 .pagination
67 .cursor_param
68 .clone()
69 .or_else(|| cursor_field.clone());
70
71 let page_param = operation
72 .pagination
73 .page_param
74 .clone()
75 .unwrap_or_else(|| detect_page_param(&call.query_params));
76
77 let limit_param = operation
78 .pagination
79 .limit_param
80 .clone()
81 .unwrap_or_else(|| detect_limit_param(&call.query_params));
82
83 let limit: usize = call
84 .query_params
85 .get(&limit_param)
86 .and_then(|v| v.parse().ok())
87 .unwrap_or(20);
88
89 let mut total_items: u64 = 0;
90
91 for _page_num in 0..MAX_PAGES {
92 let result = executor::execute(spec, call.clone(), ctx.clone()).await?;
93
94 let (body, response_headers) = match result {
95 ExecutionResult::Success { body, headers, .. } => (body, headers),
96 ExecutionResult::Cached { body } => (body, HashMap::new()),
97 ExecutionResult::DryRun { request_info } => {
98 let line = serde_json::to_string(&request_info).map_err(|e| {
99 Error::serialization_error(format!("Failed to serialize dry-run info: {e}"))
100 })?;
101 writeln!(writer, "{line}")
102 .map_err(|e| Error::io_error(format!("Failed to write output: {e}")))?;
103 break;
104 }
105 ExecutionResult::Empty => break,
106 };
107
108 let json: Value = serde_json::from_str(&body).map_err(|e| {
109 Error::invalid_json_body(format!("Page response is not valid JSON: {e}"))
110 })?;
111
112 let items = extract_items(&json);
113 let page_len = items.len();
114
115 for item in items {
116 let line = serde_json::to_string(item).map_err(|e| {
117 Error::serialization_error(format!("Failed to serialize item: {e}"))
118 })?;
119 writeln!(writer, "{line}")
120 .map_err(|e| Error::io_error(format!("Failed to write output: {e}")))?;
121 total_items += 1;
122 }
123
124 let has_next = advance_cursor(
126 strategy,
127 &mut call,
128 &json,
129 &response_headers,
130 cursor_field.as_ref(),
131 cursor_param.as_ref(),
132 &page_param,
133 page_len,
134 limit,
135 );
136 if !has_next {
137 break;
138 }
139 }
140
141 Ok(total_items)
142}
143
144#[allow(clippy::too_many_arguments)]
149fn advance_cursor(
150 strategy: PaginationStrategy,
151 call: &mut OperationCall,
152 json: &Value,
153 response_headers: &HashMap<String, String>,
154 cursor_field: Option<&String>,
155 cursor_param: Option<&String>,
156 page_param: &str,
157 page_len: usize,
158 limit: usize,
159) -> bool {
160 match strategy {
161 PaginationStrategy::None => false,
162
163 PaginationStrategy::Cursor => {
164 advance_cursor_strategy(call, json, cursor_field, cursor_param)
165 }
166
167 PaginationStrategy::Offset => advance_offset_strategy(call, page_param, page_len, limit),
168
169 PaginationStrategy::LinkHeader => advance_link_header_strategy(call, response_headers),
170 }
171}
172
173fn advance_cursor_strategy(
176 call: &mut OperationCall,
177 json: &Value,
178 cursor_field: Option<&String>,
179 cursor_param: Option<&String>,
180) -> bool {
181 let field = cursor_field.map_or("next_cursor", String::as_str);
182 let param = cursor_param.map_or(field, String::as_str);
183 match extract_cursor_value(json, field) {
184 Some(c) if !c.is_empty() => {
185 call.query_params.insert(param.to_string(), c);
186 true
187 }
188 _ => false,
189 }
190}
191
192fn advance_offset_strategy(
198 call: &mut OperationCall,
199 page_param: &str,
200 page_len: usize,
201 limit: usize,
202) -> bool {
203 if page_len == 0 || page_len < limit {
204 return false;
205 }
206
207 let is_record_offset = page_param == "offset" || page_param == "skip";
208 let next_value = if is_record_offset {
209 let current: usize = call
210 .query_params
211 .get(page_param)
212 .and_then(|v| v.parse().ok())
213 .unwrap_or(0);
214 current + page_len
215 } else {
216 let current: usize = call
217 .query_params
218 .get(page_param)
219 .and_then(|v| v.parse().ok())
220 .unwrap_or(1);
221 current + 1
222 };
223
224 call.query_params
225 .insert(page_param.to_string(), next_value.to_string());
226 true
227}
228
229fn advance_link_header_strategy(
232 call: &mut OperationCall,
233 response_headers: &HashMap<String, String>,
234) -> bool {
235 let link_value = response_headers
236 .iter()
237 .find(|(k, _)| k.to_lowercase() == constants::HEADER_LINK)
238 .map_or("", |(_, v)| v.as_str());
239
240 parse_link_next(link_value).is_some_and(|next_url| apply_next_url(call, &next_url))
241}
242
243fn extract_items(json: &Value) -> Vec<&Value> {
250 match json {
251 Value::Array(arr) => arr.iter().collect(),
252 Value::Object(_) => {
253 for field in DATA_ARRAY_FIELDS {
254 if let Some(Value::Array(arr)) = json.get(*field) {
255 return arr.iter().collect();
256 }
257 }
258 std::slice::from_ref(json).iter().collect()
260 }
261 _ => vec![],
262 }
263}
264
265fn extract_cursor_value(json: &Value, field: &str) -> Option<String> {
271 let mut current = json;
272 for part in field.split('.') {
273 current = current.get(part)?;
274 }
275 match current {
276 Value::String(s) if !s.is_empty() => Some(s.clone()),
277 Value::Number(n) => Some(n.to_string()),
278 _ => None,
279 }
280}
281
282#[must_use]
289pub fn parse_link_next(header_value: &str) -> Option<String> {
290 for part in header_value.split(',') {
291 let part = part.trim();
292 let Some(url_end) = part.find('>') else {
293 continue;
294 };
295 if !part.starts_with('<') {
296 continue;
297 }
298 let url = &part[1..url_end];
299 let rest = &part[url_end + 1..];
300 if rest.split(';').any(|seg| {
301 let seg = seg.trim().to_lowercase();
302 seg == r#"rel="next""# || seg == "rel=next"
303 }) {
304 return Some(url.to_string());
305 }
306 }
307 None
308}
309
310fn apply_next_url(call: &mut OperationCall, next_url: &str) -> bool {
318 let query_str = if let Some(pos) = next_url.find('?') {
319 &next_url[pos + 1..]
320 } else {
321 tracing::warn!(
322 next_url,
323 "Link next URL has no query string; stopping pagination"
324 );
325 return false;
326 };
327
328 let new_params: HashMap<String, String> = query_str
329 .split('&')
330 .filter_map(|pair| {
331 let mut parts = pair.splitn(2, '=');
332 let key = parts.next().filter(|k| !k.is_empty())?;
333 let val = parts.next().unwrap_or("");
334 Some((
335 urlencoding::decode(key).unwrap_or_default().into_owned(),
336 urlencoding::decode(val).unwrap_or_default().into_owned(),
337 ))
338 })
339 .collect();
340
341 if new_params.is_empty() {
342 return false;
343 }
344
345 call.query_params = new_params;
346 true
347}
348
349fn detect_page_param(params: &HashMap<String, String>) -> String {
354 constants::PAGINATION_PAGE_PARAMS
355 .iter()
356 .find(|&&p| params.contains_key(p))
357 .map_or("page", |&p| p)
358 .to_string()
359}
360
361fn detect_limit_param(params: &HashMap<String, String>) -> String {
364 constants::PAGINATION_LIMIT_PARAMS
365 .iter()
366 .find(|&&p| params.contains_key(p))
367 .map_or("limit", |&p| p)
368 .to_string()
369}
370
371#[cfg(test)]
374mod tests {
375 use super::*;
376
377 #[test]
380 fn test_parse_link_next_returns_next_url() {
381 let header = r#"<https://api.example.com/items?page=2>; rel="next", <https://api.example.com/items?page=10>; rel="last""#;
382 assert_eq!(
383 parse_link_next(header),
384 Some("https://api.example.com/items?page=2".to_string())
385 );
386 }
387
388 #[test]
389 fn test_parse_link_next_without_next_returns_none() {
390 let header = r#"<https://api.example.com/items?page=10>; rel="last""#;
391 assert_eq!(parse_link_next(header), None);
392 }
393
394 #[test]
395 fn test_parse_link_next_without_quotes() {
396 let header = "<https://api.example.com/items?page=2>; rel=next";
397 assert_eq!(
398 parse_link_next(header),
399 Some("https://api.example.com/items?page=2".to_string())
400 );
401 }
402
403 #[test]
404 fn test_parse_link_next_empty_returns_none() {
405 assert_eq!(parse_link_next(""), None);
406 }
407
408 #[test]
411 fn test_extract_cursor_value_simple_field() {
412 let json = serde_json::json!({"next_cursor": "abc123", "data": []});
413 assert_eq!(
414 extract_cursor_value(&json, "next_cursor"),
415 Some("abc123".to_string())
416 );
417 }
418
419 #[test]
420 fn test_extract_cursor_value_dotted_path() {
421 let json = serde_json::json!({"page": {"next_cursor": "tok_xyz"}});
422 assert_eq!(
423 extract_cursor_value(&json, "page.next_cursor"),
424 Some("tok_xyz".to_string())
425 );
426 }
427
428 #[test]
429 fn test_extract_cursor_value_null_returns_none() {
430 let json = serde_json::json!({"next_cursor": null});
431 assert_eq!(extract_cursor_value(&json, "next_cursor"), None);
432 }
433
434 #[test]
435 fn test_extract_cursor_value_empty_string_returns_none() {
436 let json = serde_json::json!({"next_cursor": ""});
437 assert_eq!(extract_cursor_value(&json, "next_cursor"), None);
439 }
440
441 #[test]
444 fn test_extract_items_from_top_level_array() {
445 let json = serde_json::json!([{"id": 1}, {"id": 2}]);
446 assert_eq!(extract_items(&json).len(), 2);
447 }
448
449 #[test]
450 fn test_extract_items_from_data_wrapper() {
451 let json = serde_json::json!({"data": [{"id": 1}], "total": 1});
452 assert_eq!(extract_items(&json).len(), 1);
453 }
454
455 #[test]
456 fn test_extract_items_from_items_wrapper() {
457 let json = serde_json::json!({"items": [{"id": 1}, {"id": 2}], "next_cursor": "abc"});
458 assert_eq!(extract_items(&json).len(), 2);
459 }
460
461 #[test]
462 fn test_extract_items_single_object_fallback() {
463 let json = serde_json::json!({"id": 1, "name": "Alice"});
464 assert_eq!(extract_items(&json).len(), 1);
466 }
467
468 #[test]
469 fn test_extract_items_empty_array() {
470 let json = serde_json::json!([]);
471 assert_eq!(extract_items(&json).len(), 0);
472 }
473
474 #[test]
477 fn test_advance_offset_strategy_increments_page_number() {
478 let mut call = crate::invocation::OperationCall {
479 operation_id: "op".to_string(),
480 path_params: HashMap::new(),
481 query_params: HashMap::from([("page".to_string(), "1".to_string())]),
482 header_params: HashMap::new(),
483 body: None,
484 custom_headers: vec![],
485 };
486 let has_next = advance_offset_strategy(&mut call, "page", 10, 10);
487 assert!(has_next);
488 assert_eq!(call.query_params["page"], "2");
489 }
490
491 #[test]
492 fn test_advance_offset_strategy_stops_on_partial_page() {
493 let mut call = crate::invocation::OperationCall {
494 operation_id: "op".to_string(),
495 path_params: HashMap::new(),
496 query_params: HashMap::from([("page".to_string(), "1".to_string())]),
497 header_params: HashMap::new(),
498 body: None,
499 custom_headers: vec![],
500 };
501 let has_next = advance_offset_strategy(&mut call, "page", 3, 10);
502 assert!(!has_next, "partial page should return false");
503 }
504
505 #[test]
506 fn test_advance_offset_strategy_skip_advances_by_page_len() {
507 let mut call = crate::invocation::OperationCall {
508 operation_id: "op".to_string(),
509 path_params: HashMap::new(),
510 query_params: HashMap::from([("skip".to_string(), "0".to_string())]),
511 header_params: HashMap::new(),
512 body: None,
513 custom_headers: vec![],
514 };
515 let has_next = advance_offset_strategy(&mut call, "skip", 10, 10);
516 assert!(has_next);
517 assert_eq!(call.query_params["skip"], "10");
518
519 let has_next = advance_offset_strategy(&mut call, "skip", 10, 10);
520 assert!(has_next);
521 assert_eq!(call.query_params["skip"], "20");
522 }
523
524 #[test]
525 fn test_advance_offset_strategy_offset_advances_by_page_len() {
526 let mut call = crate::invocation::OperationCall {
527 operation_id: "op".to_string(),
528 path_params: HashMap::new(),
529 query_params: HashMap::from([("offset".to_string(), "0".to_string())]),
530 header_params: HashMap::new(),
531 body: None,
532 custom_headers: vec![],
533 };
534 let has_next = advance_offset_strategy(&mut call, "offset", 5, 5);
535 assert!(has_next);
536 assert_eq!(call.query_params["offset"], "5");
537 }
538}