acdp_server/pagination.rs
1//! Cursor-pagination helpers for registry store backends.
2//!
3//! Registries paginate `list` / `search` with an opaque keyset cursor
4//! (RFC-ACDP-0002). The standard backend idiom is:
5//!
6//! 1. fetch `limit + 1` rows from the database in page order,
7//! 2. decode each row and apply an in-Rust visibility/status filter, then
8//! 3. return at most `limit` items plus a `next_cursor` for the page that
9//! follows.
10//!
11//! Two correctness invariants in that idiom are easy to get wrong, and
12//! every backend re-implements them:
13//!
14//! - **The "more rows?" signal must come from the raw DB row count, not
15//! the post-filter item count.** Fetching `limit + 1` and testing
16//! `rows.len() > limit` is the sentinel; comparing the *filtered*
17//! `items.len()` against `limit` sends clients to a phantom empty page
18//! whenever the in-Rust filter drops a row on the final page.
19//! - **The next cursor must anchor on the last *scanned* row, not the
20//! last *kept* item.** If a whole page is filtered out, anchoring on the
21//! last kept item yields `None` and terminates pagination early even
22//! though the database had more matching rows; anchoring on the last
23//! scanned row keeps the walk going.
24//!
25//! [`paginate_scanned`] and [`try_paginate_rows`] centralize both
26//! invariants (and their regression tests) so a backend supplies only the
27//! per-row decode, the keep-filter, and the cursor extractor. The cursor
28//! *encoding* stays with the backend — this module is generic over the
29//! cursor type and never inspects it.
30
31/// A page of results plus the opaque cursor for the page that follows.
32///
33/// `next_cursor` is `Some` only when the backend had at least one more row
34/// beyond this page; `None` marks the end of the walk.
35#[derive(Debug, Clone, PartialEq, Eq)]
36pub struct Paginated<T, C = String> {
37 /// The items that survived the keep-filter, in page order.
38 pub items: Vec<T>,
39 /// Cursor for the next page, or `None` at the end of the result set.
40 pub next_cursor: Option<C>,
41}
42
43/// Apply a page's keep-filter and derive its `next_cursor` from the rows
44/// already scanned for the page.
45///
46/// `scanned` is the decoded rows for *this page only* — i.e. already
47/// truncated to the page size with `take(limit)` — in page order.
48/// `has_more` MUST be computed from the raw database row count *before*
49/// any in-Rust filtering (the canonical idiom fetches `limit + 1` rows and
50/// tests `rows.len() > limit`); [`try_paginate_rows`] does this for you.
51///
52/// The next cursor is emitted iff `has_more`, and is taken from the last
53/// element of `scanned` (the last row the backend looked at), not from the
54/// last kept item — so a page whose rows are entirely filtered out still
55/// advances the cursor instead of ending pagination early.
56pub fn paginate_scanned<T, C>(
57 scanned: Vec<T>,
58 has_more: bool,
59 keep: impl Fn(&T) -> bool,
60 cursor_of: impl Fn(&T) -> C,
61) -> Paginated<T, C> {
62 let next_cursor = if has_more {
63 scanned.last().map(&cursor_of)
64 } else {
65 None
66 };
67 let items = scanned.into_iter().filter(keep).collect();
68 Paginated { items, next_cursor }
69}
70
71/// Owns the full backend idiom: take the raw `LIMIT limit + 1` result set,
72/// compute the `has_more` sentinel, drop the sentinel row, decode each
73/// remaining row with a fallible `decode`, then delegate to
74/// [`paginate_scanned`].
75///
76/// `rows` is the raw result set exactly as fetched with `LIMIT limit + 1`.
77/// A decode error short-circuits and is returned to the caller; the cursor
78/// type `C` and error type `E` are whatever the backend uses.
79pub fn try_paginate_rows<R, T, C, E>(
80 rows: Vec<R>,
81 limit: usize,
82 decode: impl Fn(R) -> Result<T, E>,
83 keep: impl Fn(&T) -> bool,
84 cursor_of: impl Fn(&T) -> C,
85) -> Result<Paginated<T, C>, E> {
86 let has_more = rows.len() > limit;
87 let mut scanned = Vec::with_capacity(rows.len().min(limit));
88 for row in rows.into_iter().take(limit) {
89 scanned.push(decode(row)?);
90 }
91 Ok(paginate_scanned(scanned, has_more, keep, cursor_of))
92}
93
94#[cfg(test)]
95mod tests {
96 use super::*;
97
98 // A row carries its keyset position and whether the keep-filter
99 // accepts it, so tests can exercise the cursor-anchoring rules.
100 #[derive(Debug, Clone, PartialEq, Eq)]
101 struct Row {
102 cursor: u32,
103 keep: bool,
104 }
105
106 fn row(cursor: u32, keep: bool) -> Row {
107 Row { cursor, keep }
108 }
109
110 fn paginate(scanned: Vec<Row>, has_more: bool) -> Paginated<Row, u32> {
111 paginate_scanned(scanned, has_more, |r| r.keep, |r| r.cursor)
112 }
113
114 #[test]
115 fn empty_input_yields_no_items_and_no_cursor() {
116 let page = paginate(vec![], false);
117 assert!(page.items.is_empty());
118 assert_eq!(page.next_cursor, None);
119 }
120
121 #[test]
122 fn no_more_rows_means_no_cursor_even_with_items() {
123 let page = paginate(vec![row(1, true), row(2, true)], false);
124 assert_eq!(page.items, vec![row(1, true), row(2, true)]);
125 assert_eq!(page.next_cursor, None, "last page must not emit a cursor");
126 }
127
128 #[test]
129 fn cursor_emitted_when_more_rows_exist() {
130 let page = paginate(vec![row(1, true), row(2, true)], true);
131 assert_eq!(page.items.len(), 2);
132 assert_eq!(
133 page.next_cursor,
134 Some(2),
135 "cursor anchors on last scanned row"
136 );
137 }
138
139 #[test]
140 fn cursor_anchors_on_last_scanned_not_last_kept() {
141 // REG-P2-8: the final scanned row is filtered out. The cursor must
142 // still advance to it (2), NOT fall back to the last *kept* item
143 // (1) — otherwise the next page re-scans row 2 forever.
144 let page = paginate(vec![row(1, true), row(2, false)], true);
145 assert_eq!(page.items, vec![row(1, true)]);
146 assert_eq!(
147 page.next_cursor,
148 Some(2),
149 "cursor must anchor on the last scanned row, even when filtered out"
150 );
151 }
152
153 #[test]
154 fn fully_filtered_page_still_advances_the_cursor() {
155 // The critical early-termination fix: every row on a non-final
156 // page is dropped, but the walk must continue from the last
157 // scanned row rather than ending with an empty page.
158 let page = paginate(vec![row(1, false), row(2, false), row(3, false)], true);
159 assert!(page.items.is_empty());
160 assert_eq!(page.next_cursor, Some(3));
161 }
162
163 #[test]
164 fn try_paginate_drops_the_sentinel_row() {
165 // limit = 2, fetched limit + 1 = 3 rows → has_more, sentinel (30)
166 // dropped, cursor anchors on the last *scanned* row (20).
167 let rows = vec![10u32, 20, 30];
168 let page: Paginated<u32, u32> =
169 try_paginate_rows(rows, 2, Ok::<u32, ()>, |_| true, |v| *v).unwrap();
170 assert_eq!(page.items, vec![10, 20]);
171 assert_eq!(page.next_cursor, Some(20));
172 }
173
174 #[test]
175 fn try_paginate_no_more_rows_when_exactly_limit() {
176 let rows = vec![10u32, 20];
177 let page: Paginated<u32, u32> =
178 try_paginate_rows(rows, 2, Ok::<u32, ()>, |_| true, |v| *v).unwrap();
179 assert_eq!(page.items, vec![10, 20]);
180 assert_eq!(page.next_cursor, None);
181 }
182
183 #[test]
184 fn try_paginate_propagates_decode_errors() {
185 let rows = vec![1u32, 2, 3];
186 let result: Result<Paginated<u32, u32>, &'static str> = try_paginate_rows(
187 rows,
188 2,
189 |v| if v == 2 { Err("boom") } else { Ok(v) },
190 |_| true,
191 |v| *v,
192 );
193 assert_eq!(result, Err("boom"));
194 }
195}