nodedb_types/calvin.rs
1// SPDX-License-Identifier: Apache-2.0
2
3//! Primitive Calvin scheduling types shared between `nodedb-physical`
4//! (the physical-plan IR layer) and `nodedb-cluster` (the distributed
5//! Calvin sequencer / scheduler).
6//!
7//! Provides [`SortedVec`], [`EngineKeySet`], and [`PassiveReadKey`] —
8//! the building blocks of Calvin read/write sets. `DependentReadSpec`
9//! and other scheduler-internal aggregates stay in `nodedb-cluster`.
10
11use serde::{Deserialize, Serialize};
12
13/// A newtype over `Vec<T>` that guarantees sorted, deduplicated contents.
14///
15/// Constructed via [`SortedVec::new`], which sorts and deduplicates at
16/// construction time. This property is load-bearing for byte-determinism:
17/// two `SortedVec`s built from the same logical set (in any insertion order)
18/// produce identical serialized bytes.
19#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
20pub struct SortedVec<T>(Vec<T>);
21
22impl<T: zerompk::ToMessagePack> zerompk::ToMessagePack for SortedVec<T> {
23 fn write<W: zerompk::Write>(&self, writer: &mut W) -> zerompk::Result<()> {
24 self.0.write(writer)
25 }
26}
27
28impl<'de, T> zerompk::FromMessagePack<'de> for SortedVec<T>
29where
30 T: zerompk::FromMessagePack<'de> + Ord + Clone,
31{
32 fn read<R: zerompk::Read<'de>>(reader: &mut R) -> zerompk::Result<Self> {
33 let v = Vec::<T>::read(reader)?;
34 Ok(Self::new(v))
35 }
36}
37
38impl<T: Ord + Clone> SortedVec<T> {
39 /// Build from any slice. Sorts and deduplicates in place.
40 pub fn new(mut items: Vec<T>) -> Self {
41 items.sort();
42 items.dedup();
43 Self(items)
44 }
45
46 pub fn as_slice(&self) -> &[T] {
47 &self.0
48 }
49
50 pub fn is_empty(&self) -> bool {
51 self.0.is_empty()
52 }
53
54 pub fn len(&self) -> usize {
55 self.0.len()
56 }
57
58 pub fn iter(&self) -> std::slice::Iter<'_, T> {
59 self.0.iter()
60 }
61}
62
63impl<T: Ord + Clone> From<Vec<T>> for SortedVec<T> {
64 fn from(v: Vec<T>) -> Self {
65 Self::new(v)
66 }
67}
68
69/// A typed key set for one engine within a read or write set.
70///
71/// Keys are normalized to surrogates (or byte keys for KV) at admission, so
72/// all engine-specific naming is resolved upstream of the sequencer.
73#[derive(
74 Debug,
75 Clone,
76 PartialEq,
77 Eq,
78 Serialize,
79 Deserialize,
80 zerompk::ToMessagePack,
81 zerompk::FromMessagePack,
82)]
83pub enum EngineKeySet {
84 /// Document engine (schemaless or strict): identified by surrogate.
85 Document {
86 collection: String,
87 surrogates: SortedVec<u32>,
88 },
89 /// Vector engine: identified by surrogate.
90 Vector {
91 collection: String,
92 surrogates: SortedVec<u32>,
93 },
94 /// Key-Value engine: identified by raw byte keys.
95 Kv {
96 collection: String,
97 keys: SortedVec<Vec<u8>>,
98 },
99 /// Graph edge engine: identified by (src_surrogate, dst_surrogate) pairs.
100 Edge {
101 collection: String,
102 edges: SortedVec<(u32, u32)>,
103 },
104}
105
106impl EngineKeySet {
107 /// O(1) estimate of the serialized byte size of this key set.
108 ///
109 /// Used by the dependent-read cap check at sequencer admission to bound
110 /// the total bytes that would be Raft-replicated in a `CalvinReadResult`
111 /// entry. This is an estimate, not an exact count; do NOT use it as a
112 /// correctness check — only as a pre-flight guard.
113 pub fn serialized_size_hint(&self) -> usize {
114 match self {
115 // u32 surrogates: 4 bytes each.
116 Self::Document { surrogates, .. } | Self::Vector { surrogates, .. } => {
117 surrogates.len() * 4
118 }
119 // KV keys: sum of key byte lengths.
120 Self::Kv { keys, .. } => keys.iter().map(|k| k.len()).sum(),
121 // Edge: two u32 per edge = 8 bytes each.
122 Self::Edge { edges, .. } => edges.len() * 8,
123 }
124 }
125
126 /// The collection this key set belongs to.
127 pub fn collection(&self) -> &str {
128 match self {
129 Self::Document { collection, .. }
130 | Self::Vector { collection, .. }
131 | Self::Kv { collection, .. }
132 | Self::Edge { collection, .. } => collection,
133 }
134 }
135
136 /// Returns `true` if this key set contains no keys.
137 pub fn is_empty(&self) -> bool {
138 match self {
139 Self::Document { surrogates, .. } => surrogates.is_empty(),
140 Self::Vector { surrogates, .. } => surrogates.is_empty(),
141 Self::Kv { keys, .. } => keys.is_empty(),
142 Self::Edge { edges, .. } => edges.is_empty(),
143 }
144 }
145}
146
147/// A single key that a passive participant must read and broadcast.
148///
149/// Wraps an [`EngineKeySet`]; per the dependent-read protocol each
150/// `PassiveReadKey` contains a single-element (or small) key set. The
151/// sequencer does not enforce single-element sets; the scheduler enforces the
152/// total byte budget via `DependentReadSpec::total_bytes()` (which lives in
153/// `nodedb-cluster`).
154#[derive(
155 Debug,
156 Clone,
157 PartialEq,
158 Eq,
159 Serialize,
160 Deserialize,
161 zerompk::ToMessagePack,
162 zerompk::FromMessagePack,
163)]
164pub struct PassiveReadKey {
165 /// The engine key set to read on the passive vshard.
166 pub engine_key: EngineKeySet,
167}