icydb_core/db/query/plan/
continuation.rs1#![allow(clippy::cast_possible_truncation)]
3
4use super::{CursorBoundary, CursorBoundarySlot, ExplainPlan, OrderSpec, PlanError};
5use crate::{
6 db::query::{
7 plan::hash_parts,
8 predicate::{SchemaInfo, validate::literal_matches_type},
9 },
10 model::entity::EntityModel,
11 serialize::{deserialize_bounded, serialize},
12 traits::FieldValue,
13 value::Value,
14};
15use serde::{Deserialize, Serialize};
16use sha2::{Digest, Sha256};
17use thiserror::Error as ThisError;
18
19#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
27pub struct ContinuationSignature([u8; 32]);
28
29impl ContinuationSignature {
30 pub(crate) const fn from_bytes(bytes: [u8; 32]) -> Self {
31 Self(bytes)
32 }
33
34 pub(crate) const fn into_bytes(self) -> [u8; 32] {
35 self.0
36 }
37
38 #[must_use]
39 pub fn as_hex(&self) -> String {
40 let mut out = String::with_capacity(64);
41 for byte in self.0 {
42 use std::fmt::Write as _;
43 let _ = write!(out, "{byte:02x}");
44 }
45 out
46 }
47}
48
49impl std::fmt::Display for ContinuationSignature {
50 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51 f.write_str(&self.as_hex())
52 }
53}
54
55const CONTINUATION_TOKEN_VERSION_V1: u8 = 1;
56#[cfg_attr(not(test), allow(dead_code))]
57const MAX_CONTINUATION_TOKEN_BYTES: usize = 8 * 1024;
58
59#[derive(Clone, Debug, Eq, PartialEq)]
65pub(crate) struct ContinuationToken {
66 signature: ContinuationSignature,
67 boundary: CursorBoundary,
68}
69
70impl ContinuationToken {
71 pub(crate) const fn new(signature: ContinuationSignature, boundary: CursorBoundary) -> Self {
72 Self {
73 signature,
74 boundary,
75 }
76 }
77
78 #[cfg_attr(not(test), allow(dead_code))]
79 pub(crate) const fn signature(&self) -> ContinuationSignature {
80 self.signature
81 }
82
83 #[cfg_attr(not(test), allow(dead_code))]
84 pub(crate) const fn boundary(&self) -> &CursorBoundary {
85 &self.boundary
86 }
87
88 pub(crate) fn encode(&self) -> Result<Vec<u8>, ContinuationTokenError> {
89 let wire = ContinuationTokenWire {
90 version: CONTINUATION_TOKEN_VERSION_V1,
91 signature: self.signature.into_bytes(),
92 boundary: self.boundary.clone(),
93 };
94
95 serialize(&wire).map_err(|err| ContinuationTokenError::Encode(err.to_string()))
96 }
97
98 #[cfg_attr(not(test), allow(dead_code))]
99 pub(crate) fn decode(bytes: &[u8]) -> Result<Self, ContinuationTokenError> {
100 let wire: ContinuationTokenWire = deserialize_bounded(bytes, MAX_CONTINUATION_TOKEN_BYTES)
101 .map_err(|err| ContinuationTokenError::Decode(err.to_string()))?;
102
103 if wire.version != CONTINUATION_TOKEN_VERSION_V1 {
104 return Err(ContinuationTokenError::UnsupportedVersion {
105 version: wire.version,
106 });
107 }
108
109 Ok(Self {
110 signature: ContinuationSignature::from_bytes(wire.signature),
111 boundary: wire.boundary,
112 })
113 }
114
115 #[cfg(test)]
116 pub(crate) fn encode_with_version_for_test(
117 &self,
118 version: u8,
119 ) -> Result<Vec<u8>, ContinuationTokenError> {
120 let wire = ContinuationTokenWire {
121 version,
122 signature: self.signature.into_bytes(),
123 boundary: self.boundary.clone(),
124 };
125
126 serialize(&wire).map_err(|err| ContinuationTokenError::Encode(err.to_string()))
127 }
128}
129
130#[derive(Clone, Debug, Eq, PartialEq, ThisError)]
136pub(crate) enum ContinuationTokenError {
137 #[error("failed to encode continuation token: {0}")]
138 Encode(String),
139
140 #[error("failed to decode continuation token: {0}")]
141 Decode(String),
142
143 #[error("unsupported continuation token version: {version}")]
144 UnsupportedVersion { version: u8 },
145}
146
147#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
152struct ContinuationTokenWire {
153 version: u8,
154 signature: [u8; 32],
155 boundary: CursorBoundary,
156}
157
158#[cfg_attr(not(test), allow(dead_code))]
160pub(crate) fn decode_validated_cursor_boundary(
161 cursor: &[u8],
162 entity_path: &'static str,
163 model: &EntityModel,
164 order: &OrderSpec,
165 expected_signature: ContinuationSignature,
166) -> Result<CursorBoundary, PlanError> {
167 let token = ContinuationToken::decode(cursor).map_err(|err| match err {
168 ContinuationTokenError::Encode(message) | ContinuationTokenError::Decode(message) => {
169 PlanError::InvalidContinuationCursor { reason: message }
170 }
171 ContinuationTokenError::UnsupportedVersion { version } => {
172 PlanError::ContinuationCursorVersionMismatch { version }
173 }
174 })?;
175
176 if token.signature() != expected_signature {
177 return Err(PlanError::ContinuationCursorSignatureMismatch {
178 entity_path,
179 expected: expected_signature.to_string(),
180 actual: token.signature().to_string(),
181 });
182 }
183
184 if token.boundary().slots.len() != order.fields.len() {
185 return Err(PlanError::ContinuationCursorBoundaryArityMismatch {
186 expected: order.fields.len(),
187 found: token.boundary().slots.len(),
188 });
189 }
190
191 validate_cursor_boundary_types(model, order, token.boundary())?;
192
193 Ok(token.boundary().clone())
194}
195
196#[cfg_attr(not(test), allow(dead_code))]
198fn validate_cursor_boundary_types(
199 model: &EntityModel,
200 order: &OrderSpec,
201 boundary: &CursorBoundary,
202) -> Result<(), PlanError> {
203 let schema = SchemaInfo::from_entity_model(model).map_err(PlanError::PredicateInvalid)?;
204
205 for ((field, _), slot) in order.fields.iter().zip(boundary.slots.iter()) {
206 let field_type = schema
207 .field(field)
208 .ok_or_else(|| PlanError::UnknownOrderField {
209 field: field.clone(),
210 })?;
211
212 match slot {
213 CursorBoundarySlot::Missing => {
214 if field == model.primary_key.name {
215 return Err(PlanError::ContinuationCursorPrimaryKeyTypeMismatch {
216 field: field.clone(),
217 expected: field_type.to_string(),
218 value: None,
219 });
220 }
221 }
222 CursorBoundarySlot::Present(value) => {
223 if !literal_matches_type(value, field_type) {
224 if field == model.primary_key.name {
225 return Err(PlanError::ContinuationCursorPrimaryKeyTypeMismatch {
226 field: field.clone(),
227 expected: field_type.to_string(),
228 value: Some(value.clone()),
229 });
230 }
231
232 return Err(PlanError::ContinuationCursorBoundaryTypeMismatch {
233 field: field.clone(),
234 expected: field_type.to_string(),
235 value: value.clone(),
236 });
237 }
238
239 if field == model.primary_key.name && Value::as_storage_key(value).is_none() {
241 return Err(PlanError::ContinuationCursorPrimaryKeyTypeMismatch {
242 field: field.clone(),
243 expected: field_type.to_string(),
244 value: Some(value.clone()),
245 });
246 }
247 }
248 }
249 }
250
251 Ok(())
252}
253
254impl<K> super::LogicalPlan<K>
259where
260 K: FieldValue,
261{
262 #[must_use]
267 pub fn continuation_signature(&self, entity_path: &'static str) -> ContinuationSignature {
268 self.explain().continuation_signature(entity_path)
269 }
270}
271
272impl ExplainPlan {
273 #[must_use]
288 pub fn continuation_signature(&self, entity_path: &'static str) -> ContinuationSignature {
289 let mut hasher = Sha256::new();
290 hasher.update(b"contsig:v1");
291 hash_parts::hash_explain_plan_profile(
292 &mut hasher,
293 self,
294 hash_parts::ExplainHashProfile::ContinuationV1 { entity_path },
295 );
296
297 let digest = hasher.finalize();
298 let mut out = [0u8; 32];
299 out.copy_from_slice(&digest);
300 ContinuationSignature::from_bytes(out)
301 }
302}
303
304#[cfg(test)]
309mod tests {
310 use crate::db::query::intent::{KeyAccess, access_plan_from_keys_value};
311 use crate::db::query::plan::{AccessPath, LogicalPlan};
312 use crate::db::query::predicate::Predicate;
313 use crate::db::query::{FieldRef, QueryMode, ReadConsistency};
314 use crate::types::Ulid;
315 use crate::value::Value;
316
317 #[test]
318 fn signature_is_deterministic_for_equivalent_predicates() {
319 let id = Ulid::default();
320
321 let predicate_a = Predicate::And(vec![
322 FieldRef::new("id").eq(id),
323 FieldRef::new("other").eq(Value::Text("x".to_string())),
324 ]);
325 let predicate_b = Predicate::And(vec![
326 FieldRef::new("other").eq(Value::Text("x".to_string())),
327 FieldRef::new("id").eq(id),
328 ]);
329
330 let mut plan_a: LogicalPlan<Value> =
331 LogicalPlan::new(AccessPath::<Value>::FullScan, ReadConsistency::MissingOk);
332 plan_a.predicate = Some(predicate_a);
333
334 let mut plan_b: LogicalPlan<Value> =
335 LogicalPlan::new(AccessPath::<Value>::FullScan, ReadConsistency::MissingOk);
336 plan_b.predicate = Some(predicate_b);
337
338 assert_eq!(
339 plan_a.continuation_signature("tests::Entity"),
340 plan_b.continuation_signature("tests::Entity")
341 );
342 }
343
344 #[test]
345 fn signature_is_deterministic_for_by_keys() {
346 let a = Ulid::from_u128(1);
347 let b = Ulid::from_u128(2);
348
349 let access_a = access_plan_from_keys_value(&KeyAccess::Many(vec![a, b, a]));
350 let access_b = access_plan_from_keys_value(&KeyAccess::Many(vec![b, a]));
351
352 let plan_a: LogicalPlan<Value> = LogicalPlan {
353 mode: QueryMode::Load(crate::db::query::LoadSpec::new()),
354 access: access_a,
355 predicate: None,
356 order: None,
357 delete_limit: None,
358 page: None,
359 consistency: ReadConsistency::MissingOk,
360 };
361 let plan_b: LogicalPlan<Value> = LogicalPlan {
362 mode: QueryMode::Load(crate::db::query::LoadSpec::new()),
363 access: access_b,
364 predicate: None,
365 order: None,
366 delete_limit: None,
367 page: None,
368 consistency: ReadConsistency::MissingOk,
369 };
370
371 assert_eq!(
372 plan_a.continuation_signature("tests::Entity"),
373 plan_b.continuation_signature("tests::Entity")
374 );
375 }
376
377 #[test]
378 fn signature_excludes_pagination_window_state() {
379 let mut plan_a: LogicalPlan<Value> =
380 LogicalPlan::new(AccessPath::<Value>::FullScan, ReadConsistency::MissingOk);
381 let mut plan_b: LogicalPlan<Value> =
382 LogicalPlan::new(AccessPath::<Value>::FullScan, ReadConsistency::MissingOk);
383
384 plan_a.page = Some(crate::db::query::plan::PageSpec {
385 limit: Some(10),
386 offset: 0,
387 });
388 plan_b.page = Some(crate::db::query::plan::PageSpec {
389 limit: Some(10),
390 offset: 999,
391 });
392
393 assert_eq!(
394 plan_a.continuation_signature("tests::Entity"),
395 plan_b.continuation_signature("tests::Entity")
396 );
397 }
398
399 #[test]
400 fn signature_changes_when_order_changes() {
401 let mut plan_a: LogicalPlan<Value> =
402 LogicalPlan::new(AccessPath::<Value>::FullScan, ReadConsistency::MissingOk);
403 let mut plan_b: LogicalPlan<Value> =
404 LogicalPlan::new(AccessPath::<Value>::FullScan, ReadConsistency::MissingOk);
405
406 plan_a.order = Some(crate::db::query::plan::OrderSpec {
407 fields: vec![(
408 "name".to_string(),
409 crate::db::query::plan::OrderDirection::Asc,
410 )],
411 });
412 plan_b.order = Some(crate::db::query::plan::OrderSpec {
413 fields: vec![(
414 "name".to_string(),
415 crate::db::query::plan::OrderDirection::Desc,
416 )],
417 });
418
419 assert_ne!(
420 plan_a.continuation_signature("tests::Entity"),
421 plan_b.continuation_signature("tests::Entity")
422 );
423 }
424
425 #[test]
426 fn signature_changes_with_entity_path() {
427 let plan: LogicalPlan<Value> =
428 LogicalPlan::new(AccessPath::<Value>::FullScan, ReadConsistency::MissingOk);
429
430 assert_ne!(
431 plan.continuation_signature("tests::EntityA"),
432 plan.continuation_signature("tests::EntityB")
433 );
434 }
435}