1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
// SPDX-License-Identifier: Apache-2.0
//! Primitive Calvin scheduling types shared between `nodedb-physical`
//! (the physical-plan IR layer) and `nodedb-cluster` (the distributed
//! Calvin sequencer / scheduler).
//!
//! Provides [`SortedVec`], [`EngineKeySet`], and [`PassiveReadKey`] —
//! the building blocks of Calvin read/write sets. `DependentReadSpec`
//! and other scheduler-internal aggregates stay in `nodedb-cluster`.
use serde::{Deserialize, Serialize};
/// A newtype over `Vec<T>` that guarantees sorted, deduplicated contents.
///
/// Constructed via [`SortedVec::new`], which sorts and deduplicates at
/// construction time. This property is load-bearing for byte-determinism:
/// two `SortedVec`s built from the same logical set (in any insertion order)
/// produce identical serialized bytes.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SortedVec<T>(Vec<T>);
impl<T: zerompk::ToMessagePack> zerompk::ToMessagePack for SortedVec<T> {
fn write<W: zerompk::Write>(&self, writer: &mut W) -> zerompk::Result<()> {
self.0.write(writer)
}
}
impl<'de, T> zerompk::FromMessagePack<'de> for SortedVec<T>
where
T: zerompk::FromMessagePack<'de> + Ord + Clone,
{
fn read<R: zerompk::Read<'de>>(reader: &mut R) -> zerompk::Result<Self> {
let v = Vec::<T>::read(reader)?;
Ok(Self::new(v))
}
}
impl<T: Ord + Clone> SortedVec<T> {
/// Build from any slice. Sorts and deduplicates in place.
pub fn new(mut items: Vec<T>) -> Self {
items.sort();
items.dedup();
Self(items)
}
pub fn as_slice(&self) -> &[T] {
&self.0
}
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub fn len(&self) -> usize {
self.0.len()
}
pub fn iter(&self) -> std::slice::Iter<'_, T> {
self.0.iter()
}
}
impl<T: Ord + Clone> From<Vec<T>> for SortedVec<T> {
fn from(v: Vec<T>) -> Self {
Self::new(v)
}
}
/// A typed key set for one engine within a read or write set.
///
/// Keys are normalized to surrogates (or byte keys for KV) at admission, so
/// all engine-specific naming is resolved upstream of the sequencer.
#[derive(
Debug,
Clone,
PartialEq,
Eq,
Serialize,
Deserialize,
zerompk::ToMessagePack,
zerompk::FromMessagePack,
)]
pub enum EngineKeySet {
/// Document engine (schemaless or strict): identified by surrogate.
Document {
collection: String,
surrogates: SortedVec<u32>,
},
/// Vector engine: identified by surrogate.
Vector {
collection: String,
surrogates: SortedVec<u32>,
},
/// Key-Value engine: identified by raw byte keys.
Kv {
collection: String,
keys: SortedVec<Vec<u8>>,
},
/// Graph edge engine: identified by (src_surrogate, dst_surrogate) pairs.
Edge {
collection: String,
edges: SortedVec<(u32, u32)>,
},
}
impl EngineKeySet {
/// O(1) estimate of the serialized byte size of this key set.
///
/// Used by the dependent-read cap check at sequencer admission to bound
/// the total bytes that would be Raft-replicated in a `CalvinReadResult`
/// entry. This is an estimate, not an exact count; do NOT use it as a
/// correctness check — only as a pre-flight guard.
pub fn serialized_size_hint(&self) -> usize {
match self {
// u32 surrogates: 4 bytes each.
Self::Document { surrogates, .. } | Self::Vector { surrogates, .. } => {
surrogates.len() * 4
}
// KV keys: sum of key byte lengths.
Self::Kv { keys, .. } => keys.iter().map(|k| k.len()).sum(),
// Edge: two u32 per edge = 8 bytes each.
Self::Edge { edges, .. } => edges.len() * 8,
}
}
/// The collection this key set belongs to.
pub fn collection(&self) -> &str {
match self {
Self::Document { collection, .. }
| Self::Vector { collection, .. }
| Self::Kv { collection, .. }
| Self::Edge { collection, .. } => collection,
}
}
/// Returns `true` if this key set contains no keys.
pub fn is_empty(&self) -> bool {
match self {
Self::Document { surrogates, .. } => surrogates.is_empty(),
Self::Vector { surrogates, .. } => surrogates.is_empty(),
Self::Kv { keys, .. } => keys.is_empty(),
Self::Edge { edges, .. } => edges.is_empty(),
}
}
}
/// A single key that a passive participant must read and broadcast.
///
/// Wraps an [`EngineKeySet`]; per the dependent-read protocol each
/// `PassiveReadKey` contains a single-element (or small) key set. The
/// sequencer does not enforce single-element sets; the scheduler enforces the
/// total byte budget via `DependentReadSpec::total_bytes()` (which lives in
/// `nodedb-cluster`).
#[derive(
Debug,
Clone,
PartialEq,
Eq,
Serialize,
Deserialize,
zerompk::ToMessagePack,
zerompk::FromMessagePack,
)]
pub struct PassiveReadKey {
/// The engine key set to read on the passive vshard.
pub engine_key: EngineKeySet,
}