Skip to main content

acdp_server/
pagination.rs

1//! Cursor-pagination helpers for registry store backends.
2//!
3//! Registries paginate `list` / `search` with an opaque keyset cursor
4//! (RFC-ACDP-0002). The standard backend idiom is:
5//!
6//! 1. fetch `limit + 1` rows from the database in page order,
7//! 2. decode each row and apply an in-Rust visibility/status filter, then
8//! 3. return at most `limit` items plus a `next_cursor` for the page that
9//!    follows.
10//!
11//! Two correctness invariants in that idiom are easy to get wrong, and
12//! every backend re-implements them:
13//!
14//! - **The "more rows?" signal must come from the raw DB row count, not
15//!   the post-filter item count.** Fetching `limit + 1` and testing
16//!   `rows.len() > limit` is the sentinel; comparing the *filtered*
17//!   `items.len()` against `limit` sends clients to a phantom empty page
18//!   whenever the in-Rust filter drops a row on the final page.
19//! - **The next cursor must anchor on the last *scanned* row, not the
20//!   last *kept* item.** If a whole page is filtered out, anchoring on the
21//!   last kept item yields `None` and terminates pagination early even
22//!   though the database had more matching rows; anchoring on the last
23//!   scanned row keeps the walk going.
24//!
25//! [`paginate_scanned`] and [`try_paginate_rows`] centralize both
26//! invariants (and their regression tests) so a backend supplies only the
27//! per-row decode, the keep-filter, and the cursor extractor. The cursor
28//! *encoding* stays with the backend — this module is generic over the
29//! cursor type and never inspects it.
30
31/// A page of results plus the opaque cursor for the page that follows.
32///
33/// `next_cursor` is `Some` only when the backend had at least one more row
34/// beyond this page; `None` marks the end of the walk.
35#[derive(Debug, Clone, PartialEq, Eq)]
36pub struct Paginated<T, C = String> {
37    /// The items that survived the keep-filter, in page order.
38    pub items: Vec<T>,
39    /// Cursor for the next page, or `None` at the end of the result set.
40    pub next_cursor: Option<C>,
41}
42
43/// Apply a page's keep-filter and derive its `next_cursor` from the rows
44/// already scanned for the page.
45///
46/// `scanned` is the decoded rows for *this page only* — i.e. already
47/// truncated to the page size with `take(limit)` — in page order.
48/// `has_more` MUST be computed from the raw database row count *before*
49/// any in-Rust filtering (the canonical idiom fetches `limit + 1` rows and
50/// tests `rows.len() > limit`); [`try_paginate_rows`] does this for you.
51///
52/// The next cursor is emitted iff `has_more`, and is taken from the last
53/// element of `scanned` (the last row the backend looked at), not from the
54/// last kept item — so a page whose rows are entirely filtered out still
55/// advances the cursor instead of ending pagination early.
56pub fn paginate_scanned<T, C>(
57    scanned: Vec<T>,
58    has_more: bool,
59    keep: impl Fn(&T) -> bool,
60    cursor_of: impl Fn(&T) -> C,
61) -> Paginated<T, C> {
62    let next_cursor = if has_more {
63        scanned.last().map(&cursor_of)
64    } else {
65        None
66    };
67    let items = scanned.into_iter().filter(keep).collect();
68    Paginated { items, next_cursor }
69}
70
71/// Owns the full backend idiom: take the raw `LIMIT limit + 1` result set,
72/// compute the `has_more` sentinel, drop the sentinel row, decode each
73/// remaining row with a fallible `decode`, then delegate to
74/// [`paginate_scanned`].
75///
76/// `rows` is the raw result set exactly as fetched with `LIMIT limit + 1`.
77/// A decode error short-circuits and is returned to the caller; the cursor
78/// type `C` and error type `E` are whatever the backend uses.
79pub fn try_paginate_rows<R, T, C, E>(
80    rows: Vec<R>,
81    limit: usize,
82    decode: impl Fn(R) -> Result<T, E>,
83    keep: impl Fn(&T) -> bool,
84    cursor_of: impl Fn(&T) -> C,
85) -> Result<Paginated<T, C>, E> {
86    let has_more = rows.len() > limit;
87    let mut scanned = Vec::with_capacity(rows.len().min(limit));
88    for row in rows.into_iter().take(limit) {
89        scanned.push(decode(row)?);
90    }
91    Ok(paginate_scanned(scanned, has_more, keep, cursor_of))
92}
93
94#[cfg(test)]
95mod tests {
96    use super::*;
97
98    // A row carries its keyset position and whether the keep-filter
99    // accepts it, so tests can exercise the cursor-anchoring rules.
100    #[derive(Debug, Clone, PartialEq, Eq)]
101    struct Row {
102        cursor: u32,
103        keep: bool,
104    }
105
106    fn row(cursor: u32, keep: bool) -> Row {
107        Row { cursor, keep }
108    }
109
110    fn paginate(scanned: Vec<Row>, has_more: bool) -> Paginated<Row, u32> {
111        paginate_scanned(scanned, has_more, |r| r.keep, |r| r.cursor)
112    }
113
114    #[test]
115    fn empty_input_yields_no_items_and_no_cursor() {
116        let page = paginate(vec![], false);
117        assert!(page.items.is_empty());
118        assert_eq!(page.next_cursor, None);
119    }
120
121    #[test]
122    fn no_more_rows_means_no_cursor_even_with_items() {
123        let page = paginate(vec![row(1, true), row(2, true)], false);
124        assert_eq!(page.items, vec![row(1, true), row(2, true)]);
125        assert_eq!(page.next_cursor, None, "last page must not emit a cursor");
126    }
127
128    #[test]
129    fn cursor_emitted_when_more_rows_exist() {
130        let page = paginate(vec![row(1, true), row(2, true)], true);
131        assert_eq!(page.items.len(), 2);
132        assert_eq!(
133            page.next_cursor,
134            Some(2),
135            "cursor anchors on last scanned row"
136        );
137    }
138
139    #[test]
140    fn cursor_anchors_on_last_scanned_not_last_kept() {
141        // REG-P2-8: the final scanned row is filtered out. The cursor must
142        // still advance to it (2), NOT fall back to the last *kept* item
143        // (1) — otherwise the next page re-scans row 2 forever.
144        let page = paginate(vec![row(1, true), row(2, false)], true);
145        assert_eq!(page.items, vec![row(1, true)]);
146        assert_eq!(
147            page.next_cursor,
148            Some(2),
149            "cursor must anchor on the last scanned row, even when filtered out"
150        );
151    }
152
153    #[test]
154    fn fully_filtered_page_still_advances_the_cursor() {
155        // The critical early-termination fix: every row on a non-final
156        // page is dropped, but the walk must continue from the last
157        // scanned row rather than ending with an empty page.
158        let page = paginate(vec![row(1, false), row(2, false), row(3, false)], true);
159        assert!(page.items.is_empty());
160        assert_eq!(page.next_cursor, Some(3));
161    }
162
163    #[test]
164    fn try_paginate_drops_the_sentinel_row() {
165        // limit = 2, fetched limit + 1 = 3 rows → has_more, sentinel (30)
166        // dropped, cursor anchors on the last *scanned* row (20).
167        let rows = vec![10u32, 20, 30];
168        let page: Paginated<u32, u32> =
169            try_paginate_rows(rows, 2, Ok::<u32, ()>, |_| true, |v| *v).unwrap();
170        assert_eq!(page.items, vec![10, 20]);
171        assert_eq!(page.next_cursor, Some(20));
172    }
173
174    #[test]
175    fn try_paginate_no_more_rows_when_exactly_limit() {
176        let rows = vec![10u32, 20];
177        let page: Paginated<u32, u32> =
178            try_paginate_rows(rows, 2, Ok::<u32, ()>, |_| true, |v| *v).unwrap();
179        assert_eq!(page.items, vec![10, 20]);
180        assert_eq!(page.next_cursor, None);
181    }
182
183    #[test]
184    fn try_paginate_propagates_decode_errors() {
185        let rows = vec![1u32, 2, 3];
186        let result: Result<Paginated<u32, u32>, &'static str> = try_paginate_rows(
187            rows,
188            2,
189            |v| if v == 2 { Err("boom") } else { Ok(v) },
190            |_| true,
191            |v| *v,
192        );
193        assert_eq!(result, Err("boom"));
194    }
195}