fraiseql_db/postgres/adapter/relay.rs
1//! `RelayDatabaseAdapter` implementation for `PostgresAdapter`.
2
3use fraiseql_error::{FraiseQLError, Result};
4
5use super::{PostgresAdapter, escape_jsonb_key};
6use crate::{
7 dialect::PostgresDialect,
8 identifier::quote_postgres_identifier,
9 postgres::where_generator::PostgresWhereGenerator,
10 traits::{CursorValue, RelayDatabaseAdapter, RelayPageResult},
11 types::{
12 QueryParam,
13 sql_hints::{OrderByClause, OrderDirection},
14 },
15 where_clause::WhereClause,
16};
17
18impl RelayDatabaseAdapter for PostgresAdapter {
19 /// Execute keyset (cursor-based) pagination against a JSONB view.
20 ///
21 /// # `totalCount` semantics
22 ///
23 /// When `include_total_count` is `true`, **two queries** are issued on the same
24 /// connection:
25 ///
26 /// 1. A count query — `SELECT COUNT(*) FROM {view} WHERE {user_filter}` — that reflects the
27 /// **full connection** size, ignoring cursor position. This is required by the Relay Cursor
28 /// Connections spec, which defines `totalCount` as the count of all objects in the
29 /// connection, regardless of `after`/`before`.
30 ///
31 /// 2. A page query — the cursor-filtered, limited result set.
32 ///
33 /// The two-query approach fixes a previous bug where `COUNT(*) OVER()` ran
34 /// inside the cursor-filtered subquery, causing `totalCount` to shrink as the
35 /// cursor advanced. It also handles the edge case where the current page is
36 /// empty but the total count is non-zero (e.g., cursor past the last row).
37 ///
38 /// When `include_total_count` is `false`, only the page query is issued.
39 ///
40 /// # Performance note
41 ///
42 /// The count query scans all rows matching the user filter without LIMIT. On
43 /// large unfiltered tables this may be slow. Mitigations:
44 /// - Only enable `totalCount` when the client explicitly requests it (enforced by the executor
45 /// via `include_total_count`).
46 /// - Add a `statement_timeout` on the connection for relay queries on very large datasets.
47 /// - Maintain a denormalised count table or materialised view for hot paths.
48 async fn execute_relay_page(
49 &self,
50 view: &str,
51 cursor_column: &str,
52 after: Option<CursorValue>,
53 before: Option<CursorValue>,
54 limit: u32,
55 forward: bool,
56 where_clause: Option<&WhereClause>,
57 order_by: Option<&[OrderByClause]>,
58 include_total_count: bool,
59 ) -> Result<RelayPageResult> {
60 let quoted_view = quote_postgres_identifier(view);
61 let quoted_col = quote_postgres_identifier(cursor_column);
62
63 // ── Cursor condition (page query only, NOT the count query) ────────────
64 //
65 // Per the Relay spec, totalCount ignores cursor position. The cursor
66 // condition is therefore excluded from the count query.
67 //
68 // The cursor occupies at most one parameter slot ($1) at the front of the
69 // page query's parameter list.
70 //
71 // UUID cursors use `$1::uuid` cast; BIGINT cursors use plain `$1`.
72 let cursor_param: Option<QueryParam>;
73 let cursor_where_part: Option<String>;
74 let active_cursor = if forward { after } else { before };
75 match active_cursor {
76 None => {
77 cursor_param = None;
78 cursor_where_part = None;
79 },
80 Some(CursorValue::Int64(pk)) => {
81 let op = if forward { ">" } else { "<" };
82 cursor_param = Some(QueryParam::BigInt(pk));
83 cursor_where_part = Some(format!("{quoted_col} {op} $1"));
84 },
85 Some(CursorValue::Uuid(uuid)) => {
86 let op = if forward { ">" } else { "<" };
87 cursor_param = Some(QueryParam::Text(uuid));
88 cursor_where_part = Some(format!("{quoted_col} {op} $1::uuid"));
89 },
90 }
91 let cursor_param_count: usize = usize::from(cursor_param.is_some());
92
93 // ── User WHERE clause ──────────────────────────────────────────────────
94 //
95 // Used in BOTH the count query (offset 0) and the page query (offset by
96 // cursor_param_count so parameter indices don't collide).
97 let mut user_where_json_params: Vec<serde_json::Value> = Vec::new();
98 let page_user_where_sql: Option<String> = if let Some(clause) = where_clause {
99 let generator = PostgresWhereGenerator::new(PostgresDialect);
100 let (sql, params) = generator.generate_with_param_offset(clause, cursor_param_count)?;
101 user_where_json_params = params;
102 Some(sql)
103 } else {
104 None
105 };
106 let user_param_count = user_where_json_params.len();
107
108 // ── ORDER BY clause ────────────────────────────────────────────────────
109 //
110 // Custom sort columns first, then cursor column as tiebreaker for stable
111 // keyset pagination.
112 let order_sql = if let Some(clauses) = order_by {
113 let mut parts: Vec<String> = clauses
114 .iter()
115 .map(|c| {
116 let dir = match c.direction {
117 OrderDirection::Asc => "ASC",
118 OrderDirection::Desc => "DESC",
119 };
120 // escape_jsonb_key is defense-in-depth: field names are already
121 // validated as GraphQL identifiers (which cannot contain `'`).
122 format!("data->>'{field}' {dir}", field = escape_jsonb_key(&c.field))
123 })
124 .collect();
125 let primary_dir = if forward { "ASC" } else { "DESC" };
126 parts.push(format!("{quoted_col} {primary_dir}"));
127 format!(" ORDER BY {}", parts.join(", "))
128 } else {
129 let dir = if forward { "ASC" } else { "DESC" };
130 format!(" ORDER BY {quoted_col} {dir}")
131 };
132
133 // ── Page WHERE SQL ─────────────────────────────────────────────────────
134 //
135 // Combines cursor condition AND user filter with offset parameter indices.
136 let cursor_part = cursor_where_part.as_deref().unwrap_or("");
137 let user_part =
138 page_user_where_sql.as_deref().map(|s| format!("({s})")).unwrap_or_default();
139 let page_where_sql = if cursor_part.is_empty() && user_part.is_empty() {
140 String::new()
141 } else if cursor_part.is_empty() {
142 format!(" WHERE {user_part}")
143 } else if user_part.is_empty() {
144 format!(" WHERE {cursor_part}")
145 } else {
146 format!(" WHERE {cursor_part} AND {user_part}")
147 };
148
149 // ── LIMIT parameter index ──────────────────────────────────────────────
150 let limit_idx = cursor_param_count + user_param_count + 1;
151
152 // ── Page SQL ───────────────────────────────────────────────────────────
153 //
154 // Backward pagination wraps the inner query in a subquery to re-sort
155 // the descending page back to ascending order.
156 let page_sql = if forward {
157 format!("SELECT data FROM {quoted_view}{page_where_sql}{order_sql} LIMIT ${limit_idx}")
158 } else {
159 let inner = format!(
160 "SELECT data, {quoted_col} AS _relay_cursor \
161 FROM {quoted_view}{page_where_sql}{order_sql} LIMIT ${limit_idx}"
162 );
163 format!("SELECT data FROM ({inner}) _relay_page ORDER BY _relay_cursor ASC")
164 };
165
166 // ── Page params: [cursor?, user_where_params..., limit] ────────────────
167 let mut page_typed_params: Vec<QueryParam> = Vec::new();
168 if let Some(cp) = cursor_param {
169 page_typed_params.push(cp);
170 }
171 for v in &user_where_json_params {
172 page_typed_params.push(QueryParam::from(v.clone()));
173 }
174 page_typed_params.push(QueryParam::BigInt(i64::from(limit)));
175
176 let client = self.acquire_connection_with_retry().await?;
177
178 // ── Execute page query ─────────────────────────────────────────────────
179 let page_param_refs = crate::types::as_sql_param_refs(&page_typed_params);
180
181 let page_rows = client.query(&page_sql, &page_param_refs).await.map_err(|e| {
182 FraiseQLError::Database {
183 message: e.to_string(),
184 sql_state: e.code().map(|c| c.code().to_string()),
185 }
186 })?;
187
188 let rows: Vec<crate::types::JsonbValue> = page_rows
189 .iter()
190 .map(|row| {
191 let data: serde_json::Value = row.get("data");
192 crate::types::JsonbValue::new(data)
193 })
194 .collect();
195
196 // ── Count query (Relay spec: totalCount ignores cursor position) ────────
197 //
198 // The WHERE clause is regenerated with offset 0 (no cursor parameter prefix)
199 // because this is a standalone query. Using the same connection avoids an
200 // extra pool acquisition.
201 let total_count = if include_total_count {
202 let (count_sql, count_typed_params) = if let Some(clause) = where_clause {
203 let generator = PostgresWhereGenerator::new(PostgresDialect);
204 let (where_sql, params) = generator.generate_with_param_offset(clause, 0)?;
205 let sql = format!("SELECT COUNT(*) FROM {quoted_view} WHERE ({where_sql})");
206 let typed: Vec<QueryParam> = params.into_iter().map(QueryParam::from).collect();
207 (sql, typed)
208 } else {
209 (format!("SELECT COUNT(*) FROM {quoted_view}"), Vec::<QueryParam>::new())
210 };
211
212 let count_param_refs = crate::types::as_sql_param_refs(&count_typed_params);
213
214 let count_row = client.query_one(&count_sql, &count_param_refs).await.map_err(|e| {
215 FraiseQLError::Database {
216 message: e.to_string(),
217 sql_state: e.code().map(|c| c.code().to_string()),
218 }
219 })?;
220
221 let total: i64 = count_row.get(0);
222 // cast_unsigned() is the clippy-recommended alternative to `as u64` for i64;
223 // it has the same bit-pattern semantics but makes the sign-loss intent explicit.
224 // Row counts from COUNT(*) are always non-negative so sign loss is impossible.
225 Some(total.cast_unsigned())
226 } else {
227 None
228 };
229
230 Ok(RelayPageResult::new(rows, total_count))
231 }
232}