Skip to main content

icydb_core/db/query/plan/
continuation.rs

1//! Continuation signature for cursor pagination compatibility checks.
2#![allow(clippy::cast_possible_truncation)]
3
4use super::{CursorBoundary, CursorBoundarySlot, ExplainPlan, OrderSpec, PlanError};
5use crate::{
6    db::query::{
7        plan::hash_parts,
8        predicate::{SchemaInfo, validate::literal_matches_type},
9    },
10    model::entity::EntityModel,
11    serialize::{deserialize_bounded, serialize},
12    traits::FieldValue,
13    value::Value,
14};
15use serde::{Deserialize, Serialize};
16use sha2::{Digest, Sha256};
17use thiserror::Error as ThisError;
18
19///
20/// ContinuationSignature
21///
22/// Stable, deterministic hash of continuation-relevant plan semantics.
23/// Excludes windowing state (`limit`, `offset`) and cursor boundaries.
24///
25
26#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
27pub struct ContinuationSignature([u8; 32]);
28
29impl ContinuationSignature {
30    pub(crate) const fn from_bytes(bytes: [u8; 32]) -> Self {
31        Self(bytes)
32    }
33
34    pub(crate) const fn into_bytes(self) -> [u8; 32] {
35        self.0
36    }
37
38    #[must_use]
39    pub fn as_hex(&self) -> String {
40        let mut out = String::with_capacity(64);
41        for byte in self.0 {
42            use std::fmt::Write as _;
43            let _ = write!(out, "{byte:02x}");
44        }
45        out
46    }
47}
48
49impl std::fmt::Display for ContinuationSignature {
50    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51        f.write_str(&self.as_hex())
52    }
53}
54
55const CONTINUATION_TOKEN_VERSION_V1: u8 = 1;
56#[cfg_attr(not(test), allow(dead_code))]
57const MAX_CONTINUATION_TOKEN_BYTES: usize = 8 * 1024;
58
59///
60/// ContinuationToken
61/// Opaque cursor payload bound to a continuation signature.
62///
63
64#[derive(Clone, Debug, Eq, PartialEq)]
65pub(crate) struct ContinuationToken {
66    signature: ContinuationSignature,
67    boundary: CursorBoundary,
68}
69
70impl ContinuationToken {
71    pub(crate) const fn new(signature: ContinuationSignature, boundary: CursorBoundary) -> Self {
72        Self {
73            signature,
74            boundary,
75        }
76    }
77
78    #[cfg_attr(not(test), allow(dead_code))]
79    pub(crate) const fn signature(&self) -> ContinuationSignature {
80        self.signature
81    }
82
83    #[cfg_attr(not(test), allow(dead_code))]
84    pub(crate) const fn boundary(&self) -> &CursorBoundary {
85        &self.boundary
86    }
87
88    pub(crate) fn encode(&self) -> Result<Vec<u8>, ContinuationTokenError> {
89        let wire = ContinuationTokenWire {
90            version: CONTINUATION_TOKEN_VERSION_V1,
91            signature: self.signature.into_bytes(),
92            boundary: self.boundary.clone(),
93        };
94
95        serialize(&wire).map_err(|err| ContinuationTokenError::Encode(err.to_string()))
96    }
97
98    #[cfg_attr(not(test), allow(dead_code))]
99    pub(crate) fn decode(bytes: &[u8]) -> Result<Self, ContinuationTokenError> {
100        let wire: ContinuationTokenWire = deserialize_bounded(bytes, MAX_CONTINUATION_TOKEN_BYTES)
101            .map_err(|err| ContinuationTokenError::Decode(err.to_string()))?;
102
103        if wire.version != CONTINUATION_TOKEN_VERSION_V1 {
104            return Err(ContinuationTokenError::UnsupportedVersion {
105                version: wire.version,
106            });
107        }
108
109        Ok(Self {
110            signature: ContinuationSignature::from_bytes(wire.signature),
111            boundary: wire.boundary,
112        })
113    }
114
115    #[cfg(test)]
116    pub(crate) fn encode_with_version_for_test(
117        &self,
118        version: u8,
119    ) -> Result<Vec<u8>, ContinuationTokenError> {
120        let wire = ContinuationTokenWire {
121            version,
122            signature: self.signature.into_bytes(),
123            boundary: self.boundary.clone(),
124        };
125
126        serialize(&wire).map_err(|err| ContinuationTokenError::Encode(err.to_string()))
127    }
128}
129
130///
131/// ContinuationTokenError
132/// Cursor token encoding/decoding failures.
133///
134
135#[derive(Clone, Debug, Eq, PartialEq, ThisError)]
136pub(crate) enum ContinuationTokenError {
137    #[error("failed to encode continuation token: {0}")]
138    Encode(String),
139
140    #[error("failed to decode continuation token: {0}")]
141    Decode(String),
142
143    #[error("unsupported continuation token version: {version}")]
144    UnsupportedVersion { version: u8 },
145}
146
147///
148/// ContinuationTokenWire
149///
150
151#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
152struct ContinuationTokenWire {
153    version: u8,
154    signature: [u8; 32],
155    boundary: CursorBoundary,
156}
157
158// Decode and validate one continuation cursor against a canonical plan surface.
159#[cfg_attr(not(test), allow(dead_code))]
160pub(crate) fn decode_validated_cursor_boundary(
161    cursor: &[u8],
162    entity_path: &'static str,
163    model: &EntityModel,
164    order: &OrderSpec,
165    expected_signature: ContinuationSignature,
166) -> Result<CursorBoundary, PlanError> {
167    let token = ContinuationToken::decode(cursor).map_err(|err| match err {
168        ContinuationTokenError::Encode(message) | ContinuationTokenError::Decode(message) => {
169            PlanError::InvalidContinuationCursor { reason: message }
170        }
171        ContinuationTokenError::UnsupportedVersion { version } => {
172            PlanError::ContinuationCursorVersionMismatch { version }
173        }
174    })?;
175
176    if token.signature() != expected_signature {
177        return Err(PlanError::ContinuationCursorSignatureMismatch {
178            entity_path,
179            expected: expected_signature.to_string(),
180            actual: token.signature().to_string(),
181        });
182    }
183
184    if token.boundary().slots.len() != order.fields.len() {
185        return Err(PlanError::ContinuationCursorBoundaryArityMismatch {
186            expected: order.fields.len(),
187            found: token.boundary().slots.len(),
188        });
189    }
190
191    validate_cursor_boundary_types(model, order, token.boundary())?;
192
193    Ok(token.boundary().clone())
194}
195
196// Validate decoded cursor boundary slot types against canonical order fields.
197#[cfg_attr(not(test), allow(dead_code))]
198fn validate_cursor_boundary_types(
199    model: &EntityModel,
200    order: &OrderSpec,
201    boundary: &CursorBoundary,
202) -> Result<(), PlanError> {
203    let schema = SchemaInfo::from_entity_model(model).map_err(PlanError::PredicateInvalid)?;
204
205    for ((field, _), slot) in order.fields.iter().zip(boundary.slots.iter()) {
206        let field_type = schema
207            .field(field)
208            .ok_or_else(|| PlanError::UnknownOrderField {
209                field: field.clone(),
210            })?;
211
212        match slot {
213            CursorBoundarySlot::Missing => {
214                if field == model.primary_key.name {
215                    return Err(PlanError::ContinuationCursorPrimaryKeyTypeMismatch {
216                        field: field.clone(),
217                        expected: field_type.to_string(),
218                        value: None,
219                    });
220                }
221            }
222            CursorBoundarySlot::Present(value) => {
223                if !literal_matches_type(value, field_type) {
224                    if field == model.primary_key.name {
225                        return Err(PlanError::ContinuationCursorPrimaryKeyTypeMismatch {
226                            field: field.clone(),
227                            expected: field_type.to_string(),
228                            value: Some(value.clone()),
229                        });
230                    }
231
232                    return Err(PlanError::ContinuationCursorBoundaryTypeMismatch {
233                        field: field.clone(),
234                        expected: field_type.to_string(),
235                        value: value.clone(),
236                    });
237                }
238
239                // Primary-key slots must also satisfy key decoding semantics.
240                if field == model.primary_key.name && Value::as_storage_key(value).is_none() {
241                    return Err(PlanError::ContinuationCursorPrimaryKeyTypeMismatch {
242                        field: field.clone(),
243                        expected: field_type.to_string(),
244                        value: Some(value.clone()),
245                    });
246                }
247            }
248        }
249    }
250
251    Ok(())
252}
253
254///
255/// LogicalPlan
256///
257
258impl<K> super::LogicalPlan<K>
259where
260    K: FieldValue,
261{
262    /// Compute a continuation signature bound to the entity path.
263    ///
264    /// This is used to validate that a continuation token belongs to the
265    /// same canonical query shape.
266    #[must_use]
267    pub fn continuation_signature(&self, entity_path: &'static str) -> ContinuationSignature {
268        self.explain().continuation_signature(entity_path)
269    }
270}
271
272impl ExplainPlan {
273    /// Compute the continuation signature for this explain plan.
274    ///
275    /// Included fields:
276    /// - entity path
277    /// - mode (load/delete)
278    /// - access path
279    /// - normalized predicate
280    /// - canonical order-by (including implicit PK tie-break)
281    /// - projection marker (currently full entity row projection)
282    ///
283    /// Excluded fields:
284    /// - pagination window (`limit`, `offset`)
285    /// - delete limits
286    /// - cursor boundary/token state
287    #[must_use]
288    pub fn continuation_signature(&self, entity_path: &'static str) -> ContinuationSignature {
289        let mut hasher = Sha256::new();
290        hasher.update(b"contsig:v1");
291        hash_parts::hash_explain_plan_profile(
292            &mut hasher,
293            self,
294            hash_parts::ExplainHashProfile::ContinuationV1 { entity_path },
295        );
296
297        let digest = hasher.finalize();
298        let mut out = [0u8; 32];
299        out.copy_from_slice(&digest);
300        ContinuationSignature::from_bytes(out)
301    }
302}
303
304///
305/// TESTS
306///
307
308#[cfg(test)]
309mod tests {
310    use crate::db::query::intent::{KeyAccess, access_plan_from_keys_value};
311    use crate::db::query::plan::{AccessPath, LogicalPlan};
312    use crate::db::query::predicate::Predicate;
313    use crate::db::query::{FieldRef, QueryMode, ReadConsistency};
314    use crate::types::Ulid;
315    use crate::value::Value;
316
317    #[test]
318    fn signature_is_deterministic_for_equivalent_predicates() {
319        let id = Ulid::default();
320
321        let predicate_a = Predicate::And(vec![
322            FieldRef::new("id").eq(id),
323            FieldRef::new("other").eq(Value::Text("x".to_string())),
324        ]);
325        let predicate_b = Predicate::And(vec![
326            FieldRef::new("other").eq(Value::Text("x".to_string())),
327            FieldRef::new("id").eq(id),
328        ]);
329
330        let mut plan_a: LogicalPlan<Value> =
331            LogicalPlan::new(AccessPath::<Value>::FullScan, ReadConsistency::MissingOk);
332        plan_a.predicate = Some(predicate_a);
333
334        let mut plan_b: LogicalPlan<Value> =
335            LogicalPlan::new(AccessPath::<Value>::FullScan, ReadConsistency::MissingOk);
336        plan_b.predicate = Some(predicate_b);
337
338        assert_eq!(
339            plan_a.continuation_signature("tests::Entity"),
340            plan_b.continuation_signature("tests::Entity")
341        );
342    }
343
344    #[test]
345    fn signature_is_deterministic_for_by_keys() {
346        let a = Ulid::from_u128(1);
347        let b = Ulid::from_u128(2);
348
349        let access_a = access_plan_from_keys_value(&KeyAccess::Many(vec![a, b, a]));
350        let access_b = access_plan_from_keys_value(&KeyAccess::Many(vec![b, a]));
351
352        let plan_a: LogicalPlan<Value> = LogicalPlan {
353            mode: QueryMode::Load(crate::db::query::LoadSpec::new()),
354            access: access_a,
355            predicate: None,
356            order: None,
357            delete_limit: None,
358            page: None,
359            consistency: ReadConsistency::MissingOk,
360        };
361        let plan_b: LogicalPlan<Value> = LogicalPlan {
362            mode: QueryMode::Load(crate::db::query::LoadSpec::new()),
363            access: access_b,
364            predicate: None,
365            order: None,
366            delete_limit: None,
367            page: None,
368            consistency: ReadConsistency::MissingOk,
369        };
370
371        assert_eq!(
372            plan_a.continuation_signature("tests::Entity"),
373            plan_b.continuation_signature("tests::Entity")
374        );
375    }
376
377    #[test]
378    fn signature_excludes_pagination_window_state() {
379        let mut plan_a: LogicalPlan<Value> =
380            LogicalPlan::new(AccessPath::<Value>::FullScan, ReadConsistency::MissingOk);
381        let mut plan_b: LogicalPlan<Value> =
382            LogicalPlan::new(AccessPath::<Value>::FullScan, ReadConsistency::MissingOk);
383
384        plan_a.page = Some(crate::db::query::plan::PageSpec {
385            limit: Some(10),
386            offset: 0,
387        });
388        plan_b.page = Some(crate::db::query::plan::PageSpec {
389            limit: Some(10),
390            offset: 999,
391        });
392
393        assert_eq!(
394            plan_a.continuation_signature("tests::Entity"),
395            plan_b.continuation_signature("tests::Entity")
396        );
397    }
398
399    #[test]
400    fn signature_changes_when_order_changes() {
401        let mut plan_a: LogicalPlan<Value> =
402            LogicalPlan::new(AccessPath::<Value>::FullScan, ReadConsistency::MissingOk);
403        let mut plan_b: LogicalPlan<Value> =
404            LogicalPlan::new(AccessPath::<Value>::FullScan, ReadConsistency::MissingOk);
405
406        plan_a.order = Some(crate::db::query::plan::OrderSpec {
407            fields: vec![(
408                "name".to_string(),
409                crate::db::query::plan::OrderDirection::Asc,
410            )],
411        });
412        plan_b.order = Some(crate::db::query::plan::OrderSpec {
413            fields: vec![(
414                "name".to_string(),
415                crate::db::query::plan::OrderDirection::Desc,
416            )],
417        });
418
419        assert_ne!(
420            plan_a.continuation_signature("tests::Entity"),
421            plan_b.continuation_signature("tests::Entity")
422        );
423    }
424
425    #[test]
426    fn signature_changes_with_entity_path() {
427        let plan: LogicalPlan<Value> =
428            LogicalPlan::new(AccessPath::<Value>::FullScan, ReadConsistency::MissingOk);
429
430        assert_ne!(
431            plan.continuation_signature("tests::EntityA"),
432            plan.continuation_signature("tests::EntityB")
433        );
434    }
435}