Skip to main content

icydb_core/db/query/plan/
continuation.rs

1//! Continuation signature for cursor pagination compatibility checks.
2#![allow(clippy::cast_possible_truncation)]
3
4use super::{CursorBoundary, CursorBoundarySlot, ExplainPlan, OrderSpec, PlanError};
5use crate::{
6    db::query::{
7        plan::hash_parts,
8        predicate::{SchemaInfo, validate::literal_matches_type},
9    },
10    error::{ErrorClass, ErrorOrigin, InternalError},
11    model::entity::EntityModel,
12    serialize::{deserialize_bounded, serialize},
13    traits::{EntityKind, FieldValue},
14    value::Value,
15};
16use serde::{Deserialize, Serialize};
17use sha2::{Digest, Sha256};
18use thiserror::Error as ThisError;
19
20///
21/// ContinuationSignature
22///
23/// Stable, deterministic hash of continuation-relevant plan semantics.
24/// Excludes windowing state (`limit`, `offset`) and cursor boundaries.
25///
26
27#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
28pub struct ContinuationSignature([u8; 32]);
29
30impl ContinuationSignature {
31    pub(crate) const fn from_bytes(bytes: [u8; 32]) -> Self {
32        Self(bytes)
33    }
34
35    pub(crate) const fn into_bytes(self) -> [u8; 32] {
36        self.0
37    }
38
39    #[must_use]
40    pub fn as_hex(&self) -> String {
41        crate::db::cursor::encode_cursor(&self.0)
42    }
43}
44
45impl std::fmt::Display for ContinuationSignature {
46    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47        f.write_str(&self.as_hex())
48    }
49}
50
51const CONTINUATION_TOKEN_VERSION_V1: u8 = 1;
52const MAX_CONTINUATION_TOKEN_BYTES: usize = 8 * 1024;
53
54/// Decode errors for typed primary-key cursor slot extraction.
55#[derive(Clone, Debug)]
56pub(crate) enum PrimaryKeyCursorSlotDecodeError {
57    Missing,
58    TypeMismatch { value: Value },
59}
60
61impl PrimaryKeyCursorSlotDecodeError {
62    /// Convert this decode failure into the optional offending value shape.
63    #[must_use]
64    pub(crate) fn into_mismatch_value(self) -> Option<Value> {
65        match self {
66            Self::Missing => None,
67            Self::TypeMismatch { value } => Some(value),
68        }
69    }
70}
71
72// Decode one primary-key cursor slot into a typed key value.
73pub(crate) fn decode_primary_key_cursor_slot<K: FieldValue>(
74    slot: &CursorBoundarySlot,
75) -> Result<K, PrimaryKeyCursorSlotDecodeError> {
76    match slot {
77        CursorBoundarySlot::Missing => Err(PrimaryKeyCursorSlotDecodeError::Missing),
78        CursorBoundarySlot::Present(value) => {
79            K::from_value(value).ok_or_else(|| PrimaryKeyCursorSlotDecodeError::TypeMismatch {
80                value: value.clone(),
81            })
82        }
83    }
84}
85
86/// Decode a typed primary-key cursor boundary for PK-ordered executor paths.
87pub(crate) fn decode_pk_cursor_boundary<E>(
88    boundary: Option<&CursorBoundary>,
89) -> Result<Option<E::Key>, InternalError>
90where
91    E: EntityKind,
92{
93    let Some(boundary) = boundary else {
94        return Ok(None);
95    };
96
97    if boundary.slots.len() != 1 {
98        return Err(InternalError::new(
99            ErrorClass::InvariantViolation,
100            ErrorOrigin::Query,
101            format!(
102                "executor invariant violated: pk-ordered continuation boundary must contain exactly 1 slot, found {}",
103                boundary.slots.len()
104            ),
105        ));
106    }
107
108    decode_primary_key_cursor_slot::<E::Key>(&boundary.slots[0])
109        .map(Some)
110        .map_err(|err| match err {
111            PrimaryKeyCursorSlotDecodeError::Missing => InternalError::new(
112                ErrorClass::InvariantViolation,
113                ErrorOrigin::Query,
114                "executor invariant violated: pk cursor slot must be present",
115            ),
116            PrimaryKeyCursorSlotDecodeError::TypeMismatch { .. } => InternalError::new(
117                ErrorClass::InvariantViolation,
118                ErrorOrigin::Query,
119                "executor invariant violated: pk cursor slot type mismatch",
120            ),
121        })
122}
123
124///
125/// ContinuationToken
126/// Opaque cursor payload bound to a continuation signature.
127///
128
129#[derive(Clone, Debug, Eq, PartialEq)]
130pub(crate) struct ContinuationToken {
131    signature: ContinuationSignature,
132    boundary: CursorBoundary,
133}
134
135impl ContinuationToken {
136    pub(crate) const fn new(signature: ContinuationSignature, boundary: CursorBoundary) -> Self {
137        Self {
138            signature,
139            boundary,
140        }
141    }
142
143    pub(crate) const fn signature(&self) -> ContinuationSignature {
144        self.signature
145    }
146
147    pub(crate) const fn boundary(&self) -> &CursorBoundary {
148        &self.boundary
149    }
150
151    pub(crate) fn encode(&self) -> Result<Vec<u8>, ContinuationTokenError> {
152        let wire = ContinuationTokenWire {
153            version: CONTINUATION_TOKEN_VERSION_V1,
154            signature: self.signature.into_bytes(),
155            boundary: self.boundary.clone(),
156        };
157
158        serialize(&wire).map_err(|err| ContinuationTokenError::Encode(err.to_string()))
159    }
160
161    pub(crate) fn decode(bytes: &[u8]) -> Result<Self, ContinuationTokenError> {
162        let wire: ContinuationTokenWire = deserialize_bounded(bytes, MAX_CONTINUATION_TOKEN_BYTES)
163            .map_err(|err| ContinuationTokenError::Decode(err.to_string()))?;
164
165        if wire.version != CONTINUATION_TOKEN_VERSION_V1 {
166            return Err(ContinuationTokenError::UnsupportedVersion {
167                version: wire.version,
168            });
169        }
170
171        Ok(Self {
172            signature: ContinuationSignature::from_bytes(wire.signature),
173            boundary: wire.boundary,
174        })
175    }
176
177    #[cfg(test)]
178    pub(crate) fn encode_with_version_for_test(
179        &self,
180        version: u8,
181    ) -> Result<Vec<u8>, ContinuationTokenError> {
182        let wire = ContinuationTokenWire {
183            version,
184            signature: self.signature.into_bytes(),
185            boundary: self.boundary.clone(),
186        };
187
188        serialize(&wire).map_err(|err| ContinuationTokenError::Encode(err.to_string()))
189    }
190}
191
192///
193/// ContinuationTokenError
194/// Cursor token encoding/decoding failures.
195///
196
197#[derive(Clone, Debug, Eq, PartialEq, ThisError)]
198pub(crate) enum ContinuationTokenError {
199    #[error("failed to encode continuation token: {0}")]
200    Encode(String),
201
202    #[error("failed to decode continuation token: {0}")]
203    Decode(String),
204
205    #[error("unsupported continuation token version: {version}")]
206    UnsupportedVersion { version: u8 },
207}
208
209///
210/// ContinuationTokenWire
211///
212
213#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
214struct ContinuationTokenWire {
215    version: u8,
216    signature: [u8; 32],
217    boundary: CursorBoundary,
218}
219
220// Decode and validate one continuation cursor against a canonical plan surface.
221pub(crate) fn decode_validated_cursor_boundary(
222    cursor: &[u8],
223    entity_path: &'static str,
224    model: &EntityModel,
225    order: &OrderSpec,
226    expected_signature: ContinuationSignature,
227) -> Result<CursorBoundary, PlanError> {
228    let token = ContinuationToken::decode(cursor).map_err(|err| match err {
229        ContinuationTokenError::Encode(message) | ContinuationTokenError::Decode(message) => {
230            PlanError::InvalidContinuationCursor { reason: message }
231        }
232        ContinuationTokenError::UnsupportedVersion { version } => {
233            PlanError::ContinuationCursorVersionMismatch { version }
234        }
235    })?;
236
237    if token.signature() != expected_signature {
238        return Err(PlanError::ContinuationCursorSignatureMismatch {
239            entity_path,
240            expected: expected_signature.to_string(),
241            actual: token.signature().to_string(),
242        });
243    }
244
245    if token.boundary().slots.len() != order.fields.len() {
246        return Err(PlanError::ContinuationCursorBoundaryArityMismatch {
247            expected: order.fields.len(),
248            found: token.boundary().slots.len(),
249        });
250    }
251
252    validate_cursor_boundary_types(model, order, token.boundary())?;
253
254    Ok(token.boundary().clone())
255}
256
257// Validate decoded cursor boundary slot types against canonical order fields.
258fn validate_cursor_boundary_types(
259    model: &EntityModel,
260    order: &OrderSpec,
261    boundary: &CursorBoundary,
262) -> Result<(), PlanError> {
263    let schema = SchemaInfo::from_entity_model(model).map_err(PlanError::PredicateInvalid)?;
264
265    for ((field, _), slot) in order.fields.iter().zip(boundary.slots.iter()) {
266        let field_type = schema
267            .field(field)
268            .ok_or_else(|| PlanError::UnknownOrderField {
269                field: field.clone(),
270            })?;
271
272        match slot {
273            CursorBoundarySlot::Missing => {
274                if field == model.primary_key.name {
275                    return Err(PlanError::ContinuationCursorPrimaryKeyTypeMismatch {
276                        field: field.clone(),
277                        expected: field_type.to_string(),
278                        value: None,
279                    });
280                }
281            }
282            CursorBoundarySlot::Present(value) => {
283                if !literal_matches_type(value, field_type) {
284                    if field == model.primary_key.name {
285                        return Err(PlanError::ContinuationCursorPrimaryKeyTypeMismatch {
286                            field: field.clone(),
287                            expected: field_type.to_string(),
288                            value: Some(value.clone()),
289                        });
290                    }
291
292                    return Err(PlanError::ContinuationCursorBoundaryTypeMismatch {
293                        field: field.clone(),
294                        expected: field_type.to_string(),
295                        value: value.clone(),
296                    });
297                }
298
299                // Primary-key slots must also satisfy key decoding semantics.
300                if field == model.primary_key.name && Value::as_storage_key(value).is_none() {
301                    return Err(PlanError::ContinuationCursorPrimaryKeyTypeMismatch {
302                        field: field.clone(),
303                        expected: field_type.to_string(),
304                        value: Some(value.clone()),
305                    });
306                }
307            }
308        }
309    }
310
311    Ok(())
312}
313
314///
315/// LogicalPlan
316///
317
318impl<K> super::LogicalPlan<K>
319where
320    K: FieldValue,
321{
322    /// Compute a continuation signature bound to the entity path.
323    ///
324    /// This is used to validate that a continuation token belongs to the
325    /// same canonical query shape.
326    #[must_use]
327    pub fn continuation_signature(&self, entity_path: &'static str) -> ContinuationSignature {
328        self.explain().continuation_signature(entity_path)
329    }
330}
331
332impl ExplainPlan {
333    /// Compute the continuation signature for this explain plan.
334    ///
335    /// Included fields:
336    /// - entity path
337    /// - mode (load/delete)
338    /// - access path
339    /// - normalized predicate
340    /// - canonical order-by (including implicit PK tie-break)
341    /// - projection marker (currently full entity row projection)
342    ///
343    /// Excluded fields:
344    /// - pagination window (`limit`, `offset`)
345    /// - delete limits
346    /// - cursor boundary/token state
347    #[must_use]
348    pub fn continuation_signature(&self, entity_path: &'static str) -> ContinuationSignature {
349        let mut hasher = Sha256::new();
350        hasher.update(b"contsig:v1");
351        hash_parts::hash_explain_plan_profile(
352            &mut hasher,
353            self,
354            hash_parts::ExplainHashProfile::ContinuationV1 { entity_path },
355        );
356
357        let digest = hasher.finalize();
358        let mut out = [0u8; 32];
359        out.copy_from_slice(&digest);
360        ContinuationSignature::from_bytes(out)
361    }
362}
363
364///
365/// TESTS
366///
367
368#[cfg(test)]
369mod tests {
370    use crate::db::query::intent::{KeyAccess, access_plan_from_keys_value};
371    use crate::db::query::plan::{AccessPath, LogicalPlan};
372    use crate::db::query::predicate::Predicate;
373    use crate::db::query::{FieldRef, QueryMode, ReadConsistency};
374    use crate::types::Ulid;
375    use crate::value::Value;
376
377    #[test]
378    fn signature_is_deterministic_for_equivalent_predicates() {
379        let id = Ulid::default();
380
381        let predicate_a = Predicate::And(vec![
382            FieldRef::new("id").eq(id),
383            FieldRef::new("other").eq(Value::Text("x".to_string())),
384        ]);
385        let predicate_b = Predicate::And(vec![
386            FieldRef::new("other").eq(Value::Text("x".to_string())),
387            FieldRef::new("id").eq(id),
388        ]);
389
390        let mut plan_a: LogicalPlan<Value> =
391            LogicalPlan::new(AccessPath::<Value>::FullScan, ReadConsistency::MissingOk);
392        plan_a.predicate = Some(predicate_a);
393
394        let mut plan_b: LogicalPlan<Value> =
395            LogicalPlan::new(AccessPath::<Value>::FullScan, ReadConsistency::MissingOk);
396        plan_b.predicate = Some(predicate_b);
397
398        assert_eq!(
399            plan_a.continuation_signature("tests::Entity"),
400            plan_b.continuation_signature("tests::Entity")
401        );
402    }
403
404    #[test]
405    fn signature_is_deterministic_for_by_keys() {
406        let a = Ulid::from_u128(1);
407        let b = Ulid::from_u128(2);
408
409        let access_a = access_plan_from_keys_value(&KeyAccess::Many(vec![a, b, a]));
410        let access_b = access_plan_from_keys_value(&KeyAccess::Many(vec![b, a]));
411
412        let plan_a: LogicalPlan<Value> = LogicalPlan {
413            mode: QueryMode::Load(crate::db::query::LoadSpec::new()),
414            access: access_a,
415            predicate: None,
416            order: None,
417            delete_limit: None,
418            page: None,
419            consistency: ReadConsistency::MissingOk,
420        };
421        let plan_b: LogicalPlan<Value> = LogicalPlan {
422            mode: QueryMode::Load(crate::db::query::LoadSpec::new()),
423            access: access_b,
424            predicate: None,
425            order: None,
426            delete_limit: None,
427            page: None,
428            consistency: ReadConsistency::MissingOk,
429        };
430
431        assert_eq!(
432            plan_a.continuation_signature("tests::Entity"),
433            plan_b.continuation_signature("tests::Entity")
434        );
435    }
436
437    #[test]
438    fn signature_excludes_pagination_window_state() {
439        let mut plan_a: LogicalPlan<Value> =
440            LogicalPlan::new(AccessPath::<Value>::FullScan, ReadConsistency::MissingOk);
441        let mut plan_b: LogicalPlan<Value> =
442            LogicalPlan::new(AccessPath::<Value>::FullScan, ReadConsistency::MissingOk);
443
444        plan_a.page = Some(crate::db::query::plan::PageSpec {
445            limit: Some(10),
446            offset: 0,
447        });
448        plan_b.page = Some(crate::db::query::plan::PageSpec {
449            limit: Some(10),
450            offset: 999,
451        });
452
453        assert_eq!(
454            plan_a.continuation_signature("tests::Entity"),
455            plan_b.continuation_signature("tests::Entity")
456        );
457    }
458
459    #[test]
460    fn signature_changes_when_order_changes() {
461        let mut plan_a: LogicalPlan<Value> =
462            LogicalPlan::new(AccessPath::<Value>::FullScan, ReadConsistency::MissingOk);
463        let mut plan_b: LogicalPlan<Value> =
464            LogicalPlan::new(AccessPath::<Value>::FullScan, ReadConsistency::MissingOk);
465
466        plan_a.order = Some(crate::db::query::plan::OrderSpec {
467            fields: vec![(
468                "name".to_string(),
469                crate::db::query::plan::OrderDirection::Asc,
470            )],
471        });
472        plan_b.order = Some(crate::db::query::plan::OrderSpec {
473            fields: vec![(
474                "name".to_string(),
475                crate::db::query::plan::OrderDirection::Desc,
476            )],
477        });
478
479        assert_ne!(
480            plan_a.continuation_signature("tests::Entity"),
481            plan_b.continuation_signature("tests::Entity")
482        );
483    }
484
485    #[test]
486    fn signature_changes_with_entity_path() {
487        let plan: LogicalPlan<Value> =
488            LogicalPlan::new(AccessPath::<Value>::FullScan, ReadConsistency::MissingOk);
489
490        assert_ne!(
491            plan.continuation_signature("tests::EntityA"),
492            plan.continuation_signature("tests::EntityB")
493        );
494    }
495}