Skip to main content

nodedb_types/
calvin.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Primitive Calvin scheduling types shared between `nodedb-physical`
4//! (the physical-plan IR layer) and `nodedb-cluster` (the distributed
5//! Calvin sequencer / scheduler).
6//!
7//! Provides [`SortedVec`], [`EngineKeySet`], and [`PassiveReadKey`] —
8//! the building blocks of Calvin read/write sets. `DependentReadSpec`
9//! and other scheduler-internal aggregates stay in `nodedb-cluster`.
10
11use serde::{Deserialize, Serialize};
12
13/// A newtype over `Vec<T>` that guarantees sorted, deduplicated contents.
14///
15/// Constructed via [`SortedVec::new`], which sorts and deduplicates at
16/// construction time. This property is load-bearing for byte-determinism:
17/// two `SortedVec`s built from the same logical set (in any insertion order)
18/// produce identical serialized bytes.
19#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
20pub struct SortedVec<T>(Vec<T>);
21
22impl<T: zerompk::ToMessagePack> zerompk::ToMessagePack for SortedVec<T> {
23    fn write<W: zerompk::Write>(&self, writer: &mut W) -> zerompk::Result<()> {
24        self.0.write(writer)
25    }
26}
27
28impl<'de, T> zerompk::FromMessagePack<'de> for SortedVec<T>
29where
30    T: zerompk::FromMessagePack<'de> + Ord + Clone,
31{
32    fn read<R: zerompk::Read<'de>>(reader: &mut R) -> zerompk::Result<Self> {
33        let v = Vec::<T>::read(reader)?;
34        Ok(Self::new(v))
35    }
36}
37
38impl<T: Ord + Clone> SortedVec<T> {
39    /// Build from any slice. Sorts and deduplicates in place.
40    pub fn new(mut items: Vec<T>) -> Self {
41        items.sort();
42        items.dedup();
43        Self(items)
44    }
45
46    pub fn as_slice(&self) -> &[T] {
47        &self.0
48    }
49
50    pub fn is_empty(&self) -> bool {
51        self.0.is_empty()
52    }
53
54    pub fn len(&self) -> usize {
55        self.0.len()
56    }
57
58    pub fn iter(&self) -> std::slice::Iter<'_, T> {
59        self.0.iter()
60    }
61}
62
63impl<T: Ord + Clone> From<Vec<T>> for SortedVec<T> {
64    fn from(v: Vec<T>) -> Self {
65        Self::new(v)
66    }
67}
68
69/// A typed key set for one engine within a read or write set.
70///
71/// Keys are normalized to surrogates (or byte keys for KV) at admission, so
72/// all engine-specific naming is resolved upstream of the sequencer.
73#[derive(
74    Debug,
75    Clone,
76    PartialEq,
77    Eq,
78    Serialize,
79    Deserialize,
80    zerompk::ToMessagePack,
81    zerompk::FromMessagePack,
82)]
83pub enum EngineKeySet {
84    /// Document engine (schemaless or strict): identified by surrogate.
85    Document {
86        collection: String,
87        surrogates: SortedVec<u32>,
88    },
89    /// Vector engine: identified by surrogate.
90    Vector {
91        collection: String,
92        surrogates: SortedVec<u32>,
93    },
94    /// Key-Value engine: identified by raw byte keys.
95    Kv {
96        collection: String,
97        keys: SortedVec<Vec<u8>>,
98    },
99    /// Graph edge engine: identified by (src_surrogate, dst_surrogate) pairs.
100    Edge {
101        collection: String,
102        edges: SortedVec<(u32, u32)>,
103    },
104}
105
106impl EngineKeySet {
107    /// O(1) estimate of the serialized byte size of this key set.
108    ///
109    /// Used by the dependent-read cap check at sequencer admission to bound
110    /// the total bytes that would be Raft-replicated in a `CalvinReadResult`
111    /// entry.  This is an estimate, not an exact count; do NOT use it as a
112    /// correctness check — only as a pre-flight guard.
113    pub fn serialized_size_hint(&self) -> usize {
114        match self {
115            // u32 surrogates: 4 bytes each.
116            Self::Document { surrogates, .. } | Self::Vector { surrogates, .. } => {
117                surrogates.len() * 4
118            }
119            // KV keys: sum of key byte lengths.
120            Self::Kv { keys, .. } => keys.iter().map(|k| k.len()).sum(),
121            // Edge: two u32 per edge = 8 bytes each.
122            Self::Edge { edges, .. } => edges.len() * 8,
123        }
124    }
125
126    /// The collection this key set belongs to.
127    pub fn collection(&self) -> &str {
128        match self {
129            Self::Document { collection, .. }
130            | Self::Vector { collection, .. }
131            | Self::Kv { collection, .. }
132            | Self::Edge { collection, .. } => collection,
133        }
134    }
135
136    /// Returns `true` if this key set contains no keys.
137    pub fn is_empty(&self) -> bool {
138        match self {
139            Self::Document { surrogates, .. } => surrogates.is_empty(),
140            Self::Vector { surrogates, .. } => surrogates.is_empty(),
141            Self::Kv { keys, .. } => keys.is_empty(),
142            Self::Edge { edges, .. } => edges.is_empty(),
143        }
144    }
145}
146
147/// A single key that a passive participant must read and broadcast.
148///
149/// Wraps an [`EngineKeySet`]; per the dependent-read protocol each
150/// `PassiveReadKey` contains a single-element (or small) key set.  The
151/// sequencer does not enforce single-element sets; the scheduler enforces the
152/// total byte budget via `DependentReadSpec::total_bytes()` (which lives in
153/// `nodedb-cluster`).
154#[derive(
155    Debug,
156    Clone,
157    PartialEq,
158    Eq,
159    Serialize,
160    Deserialize,
161    zerompk::ToMessagePack,
162    zerompk::FromMessagePack,
163)]
164pub struct PassiveReadKey {
165    /// The engine key set to read on the passive vshard.
166    pub engine_key: EngineKeySet,
167}