1#![allow(clippy::cast_possible_truncation)]
3
4use super::{CursorBoundary, CursorBoundarySlot, ExplainPlan, OrderSpec, PlanError};
5use crate::{
6 db::query::{
7 plan::hash_parts,
8 predicate::{SchemaInfo, validate::literal_matches_type},
9 },
10 error::{ErrorClass, ErrorOrigin, InternalError},
11 model::entity::EntityModel,
12 serialize::{deserialize_bounded, serialize},
13 traits::{EntityKind, FieldValue},
14 value::Value,
15};
16use serde::{Deserialize, Serialize};
17use sha2::{Digest, Sha256};
18use thiserror::Error as ThisError;
19
20#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
28pub struct ContinuationSignature([u8; 32]);
29
30impl ContinuationSignature {
31 pub(crate) const fn from_bytes(bytes: [u8; 32]) -> Self {
32 Self(bytes)
33 }
34
35 pub(crate) const fn into_bytes(self) -> [u8; 32] {
36 self.0
37 }
38
39 #[must_use]
40 pub fn as_hex(&self) -> String {
41 crate::db::cursor::encode_cursor(&self.0)
42 }
43}
44
45impl std::fmt::Display for ContinuationSignature {
46 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47 f.write_str(&self.as_hex())
48 }
49}
50
51const CONTINUATION_TOKEN_VERSION_V1: u8 = 1;
52const MAX_CONTINUATION_TOKEN_BYTES: usize = 8 * 1024;
53
54#[derive(Clone, Debug)]
56pub(crate) enum PrimaryKeyCursorSlotDecodeError {
57 Missing,
58 TypeMismatch { value: Value },
59}
60
61impl PrimaryKeyCursorSlotDecodeError {
62 #[must_use]
64 pub(crate) fn into_mismatch_value(self) -> Option<Value> {
65 match self {
66 Self::Missing => None,
67 Self::TypeMismatch { value } => Some(value),
68 }
69 }
70}
71
72pub(crate) fn decode_primary_key_cursor_slot<K: FieldValue>(
74 slot: &CursorBoundarySlot,
75) -> Result<K, PrimaryKeyCursorSlotDecodeError> {
76 match slot {
77 CursorBoundarySlot::Missing => Err(PrimaryKeyCursorSlotDecodeError::Missing),
78 CursorBoundarySlot::Present(value) => {
79 K::from_value(value).ok_or_else(|| PrimaryKeyCursorSlotDecodeError::TypeMismatch {
80 value: value.clone(),
81 })
82 }
83 }
84}
85
86pub(crate) fn decode_pk_cursor_boundary<E>(
88 boundary: Option<&CursorBoundary>,
89) -> Result<Option<E::Key>, InternalError>
90where
91 E: EntityKind,
92{
93 let Some(boundary) = boundary else {
94 return Ok(None);
95 };
96
97 if boundary.slots.len() != 1 {
98 return Err(InternalError::new(
99 ErrorClass::InvariantViolation,
100 ErrorOrigin::Query,
101 format!(
102 "executor invariant violated: pk-ordered continuation boundary must contain exactly 1 slot, found {}",
103 boundary.slots.len()
104 ),
105 ));
106 }
107
108 decode_primary_key_cursor_slot::<E::Key>(&boundary.slots[0])
109 .map(Some)
110 .map_err(|err| match err {
111 PrimaryKeyCursorSlotDecodeError::Missing => InternalError::new(
112 ErrorClass::InvariantViolation,
113 ErrorOrigin::Query,
114 "executor invariant violated: pk cursor slot must be present",
115 ),
116 PrimaryKeyCursorSlotDecodeError::TypeMismatch { .. } => InternalError::new(
117 ErrorClass::InvariantViolation,
118 ErrorOrigin::Query,
119 "executor invariant violated: pk cursor slot type mismatch",
120 ),
121 })
122}
123
124#[derive(Clone, Debug, Eq, PartialEq)]
130pub(crate) struct ContinuationToken {
131 signature: ContinuationSignature,
132 boundary: CursorBoundary,
133}
134
135impl ContinuationToken {
136 pub(crate) const fn new(signature: ContinuationSignature, boundary: CursorBoundary) -> Self {
137 Self {
138 signature,
139 boundary,
140 }
141 }
142
143 pub(crate) const fn signature(&self) -> ContinuationSignature {
144 self.signature
145 }
146
147 pub(crate) const fn boundary(&self) -> &CursorBoundary {
148 &self.boundary
149 }
150
151 pub(crate) fn encode(&self) -> Result<Vec<u8>, ContinuationTokenError> {
152 let wire = ContinuationTokenWire {
153 version: CONTINUATION_TOKEN_VERSION_V1,
154 signature: self.signature.into_bytes(),
155 boundary: self.boundary.clone(),
156 };
157
158 serialize(&wire).map_err(|err| ContinuationTokenError::Encode(err.to_string()))
159 }
160
161 pub(crate) fn decode(bytes: &[u8]) -> Result<Self, ContinuationTokenError> {
162 let wire: ContinuationTokenWire = deserialize_bounded(bytes, MAX_CONTINUATION_TOKEN_BYTES)
163 .map_err(|err| ContinuationTokenError::Decode(err.to_string()))?;
164
165 if wire.version != CONTINUATION_TOKEN_VERSION_V1 {
166 return Err(ContinuationTokenError::UnsupportedVersion {
167 version: wire.version,
168 });
169 }
170
171 Ok(Self {
172 signature: ContinuationSignature::from_bytes(wire.signature),
173 boundary: wire.boundary,
174 })
175 }
176
177 #[cfg(test)]
178 pub(crate) fn encode_with_version_for_test(
179 &self,
180 version: u8,
181 ) -> Result<Vec<u8>, ContinuationTokenError> {
182 let wire = ContinuationTokenWire {
183 version,
184 signature: self.signature.into_bytes(),
185 boundary: self.boundary.clone(),
186 };
187
188 serialize(&wire).map_err(|err| ContinuationTokenError::Encode(err.to_string()))
189 }
190}
191
192#[derive(Clone, Debug, Eq, PartialEq, ThisError)]
198pub(crate) enum ContinuationTokenError {
199 #[error("failed to encode continuation token: {0}")]
200 Encode(String),
201
202 #[error("failed to decode continuation token: {0}")]
203 Decode(String),
204
205 #[error("unsupported continuation token version: {version}")]
206 UnsupportedVersion { version: u8 },
207}
208
209#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
214struct ContinuationTokenWire {
215 version: u8,
216 signature: [u8; 32],
217 boundary: CursorBoundary,
218}
219
220pub(crate) fn decode_validated_cursor_boundary(
222 cursor: &[u8],
223 entity_path: &'static str,
224 model: &EntityModel,
225 order: &OrderSpec,
226 expected_signature: ContinuationSignature,
227) -> Result<CursorBoundary, PlanError> {
228 let token = ContinuationToken::decode(cursor).map_err(|err| match err {
229 ContinuationTokenError::Encode(message) | ContinuationTokenError::Decode(message) => {
230 PlanError::InvalidContinuationCursor { reason: message }
231 }
232 ContinuationTokenError::UnsupportedVersion { version } => {
233 PlanError::ContinuationCursorVersionMismatch { version }
234 }
235 })?;
236
237 if token.signature() != expected_signature {
238 return Err(PlanError::ContinuationCursorSignatureMismatch {
239 entity_path,
240 expected: expected_signature.to_string(),
241 actual: token.signature().to_string(),
242 });
243 }
244
245 if token.boundary().slots.len() != order.fields.len() {
246 return Err(PlanError::ContinuationCursorBoundaryArityMismatch {
247 expected: order.fields.len(),
248 found: token.boundary().slots.len(),
249 });
250 }
251
252 validate_cursor_boundary_types(model, order, token.boundary())?;
253
254 Ok(token.boundary().clone())
255}
256
257fn validate_cursor_boundary_types(
259 model: &EntityModel,
260 order: &OrderSpec,
261 boundary: &CursorBoundary,
262) -> Result<(), PlanError> {
263 let schema = SchemaInfo::from_entity_model(model).map_err(PlanError::PredicateInvalid)?;
264
265 for ((field, _), slot) in order.fields.iter().zip(boundary.slots.iter()) {
266 let field_type = schema
267 .field(field)
268 .ok_or_else(|| PlanError::UnknownOrderField {
269 field: field.clone(),
270 })?;
271
272 match slot {
273 CursorBoundarySlot::Missing => {
274 if field == model.primary_key.name {
275 return Err(PlanError::ContinuationCursorPrimaryKeyTypeMismatch {
276 field: field.clone(),
277 expected: field_type.to_string(),
278 value: None,
279 });
280 }
281 }
282 CursorBoundarySlot::Present(value) => {
283 if !literal_matches_type(value, field_type) {
284 if field == model.primary_key.name {
285 return Err(PlanError::ContinuationCursorPrimaryKeyTypeMismatch {
286 field: field.clone(),
287 expected: field_type.to_string(),
288 value: Some(value.clone()),
289 });
290 }
291
292 return Err(PlanError::ContinuationCursorBoundaryTypeMismatch {
293 field: field.clone(),
294 expected: field_type.to_string(),
295 value: value.clone(),
296 });
297 }
298
299 if field == model.primary_key.name && Value::as_storage_key(value).is_none() {
301 return Err(PlanError::ContinuationCursorPrimaryKeyTypeMismatch {
302 field: field.clone(),
303 expected: field_type.to_string(),
304 value: Some(value.clone()),
305 });
306 }
307 }
308 }
309 }
310
311 Ok(())
312}
313
314impl<K> super::LogicalPlan<K>
319where
320 K: FieldValue,
321{
322 #[must_use]
327 pub fn continuation_signature(&self, entity_path: &'static str) -> ContinuationSignature {
328 self.explain().continuation_signature(entity_path)
329 }
330}
331
332impl ExplainPlan {
333 #[must_use]
348 pub fn continuation_signature(&self, entity_path: &'static str) -> ContinuationSignature {
349 let mut hasher = Sha256::new();
350 hasher.update(b"contsig:v1");
351 hash_parts::hash_explain_plan_profile(
352 &mut hasher,
353 self,
354 hash_parts::ExplainHashProfile::ContinuationV1 { entity_path },
355 );
356
357 let digest = hasher.finalize();
358 let mut out = [0u8; 32];
359 out.copy_from_slice(&digest);
360 ContinuationSignature::from_bytes(out)
361 }
362}
363
364#[cfg(test)]
369mod tests {
370 use crate::db::query::intent::{KeyAccess, access_plan_from_keys_value};
371 use crate::db::query::plan::{AccessPath, LogicalPlan};
372 use crate::db::query::predicate::Predicate;
373 use crate::db::query::{FieldRef, QueryMode, ReadConsistency};
374 use crate::types::Ulid;
375 use crate::value::Value;
376
377 #[test]
378 fn signature_is_deterministic_for_equivalent_predicates() {
379 let id = Ulid::default();
380
381 let predicate_a = Predicate::And(vec![
382 FieldRef::new("id").eq(id),
383 FieldRef::new("other").eq(Value::Text("x".to_string())),
384 ]);
385 let predicate_b = Predicate::And(vec![
386 FieldRef::new("other").eq(Value::Text("x".to_string())),
387 FieldRef::new("id").eq(id),
388 ]);
389
390 let mut plan_a: LogicalPlan<Value> =
391 LogicalPlan::new(AccessPath::<Value>::FullScan, ReadConsistency::MissingOk);
392 plan_a.predicate = Some(predicate_a);
393
394 let mut plan_b: LogicalPlan<Value> =
395 LogicalPlan::new(AccessPath::<Value>::FullScan, ReadConsistency::MissingOk);
396 plan_b.predicate = Some(predicate_b);
397
398 assert_eq!(
399 plan_a.continuation_signature("tests::Entity"),
400 plan_b.continuation_signature("tests::Entity")
401 );
402 }
403
404 #[test]
405 fn signature_is_deterministic_for_by_keys() {
406 let a = Ulid::from_u128(1);
407 let b = Ulid::from_u128(2);
408
409 let access_a = access_plan_from_keys_value(&KeyAccess::Many(vec![a, b, a]));
410 let access_b = access_plan_from_keys_value(&KeyAccess::Many(vec![b, a]));
411
412 let plan_a: LogicalPlan<Value> = LogicalPlan {
413 mode: QueryMode::Load(crate::db::query::LoadSpec::new()),
414 access: access_a,
415 predicate: None,
416 order: None,
417 delete_limit: None,
418 page: None,
419 consistency: ReadConsistency::MissingOk,
420 };
421 let plan_b: LogicalPlan<Value> = LogicalPlan {
422 mode: QueryMode::Load(crate::db::query::LoadSpec::new()),
423 access: access_b,
424 predicate: None,
425 order: None,
426 delete_limit: None,
427 page: None,
428 consistency: ReadConsistency::MissingOk,
429 };
430
431 assert_eq!(
432 plan_a.continuation_signature("tests::Entity"),
433 plan_b.continuation_signature("tests::Entity")
434 );
435 }
436
437 #[test]
438 fn signature_excludes_pagination_window_state() {
439 let mut plan_a: LogicalPlan<Value> =
440 LogicalPlan::new(AccessPath::<Value>::FullScan, ReadConsistency::MissingOk);
441 let mut plan_b: LogicalPlan<Value> =
442 LogicalPlan::new(AccessPath::<Value>::FullScan, ReadConsistency::MissingOk);
443
444 plan_a.page = Some(crate::db::query::plan::PageSpec {
445 limit: Some(10),
446 offset: 0,
447 });
448 plan_b.page = Some(crate::db::query::plan::PageSpec {
449 limit: Some(10),
450 offset: 999,
451 });
452
453 assert_eq!(
454 plan_a.continuation_signature("tests::Entity"),
455 plan_b.continuation_signature("tests::Entity")
456 );
457 }
458
459 #[test]
460 fn signature_changes_when_order_changes() {
461 let mut plan_a: LogicalPlan<Value> =
462 LogicalPlan::new(AccessPath::<Value>::FullScan, ReadConsistency::MissingOk);
463 let mut plan_b: LogicalPlan<Value> =
464 LogicalPlan::new(AccessPath::<Value>::FullScan, ReadConsistency::MissingOk);
465
466 plan_a.order = Some(crate::db::query::plan::OrderSpec {
467 fields: vec![(
468 "name".to_string(),
469 crate::db::query::plan::OrderDirection::Asc,
470 )],
471 });
472 plan_b.order = Some(crate::db::query::plan::OrderSpec {
473 fields: vec![(
474 "name".to_string(),
475 crate::db::query::plan::OrderDirection::Desc,
476 )],
477 });
478
479 assert_ne!(
480 plan_a.continuation_signature("tests::Entity"),
481 plan_b.continuation_signature("tests::Entity")
482 );
483 }
484
485 #[test]
486 fn signature_changes_with_entity_path() {
487 let plan: LogicalPlan<Value> =
488 LogicalPlan::new(AccessPath::<Value>::FullScan, ReadConsistency::MissingOk);
489
490 assert_ne!(
491 plan.continuation_signature("tests::EntityA"),
492 plan.continuation_signature("tests::EntityB")
493 );
494 }
495}