amaters_cluster/key_rotation.rs
1//! Key rotation for [`LogEncryptionKey`]s.
2//!
3//! Each [`crate::encryption::EncryptedPayload`] carries the [`KeyVersion`] it
4//! was encrypted under so that decryption can look up the historical key
5//! even after the master key has been rotated. The current key (highest
6//! version) is always used for new encryptions; older keys are retained
7//! for `retention` rotations and then dropped.
8//!
9//! ## Background rotation task
10//!
11//! Automatic, time-based rotation via a `tokio` task is **deferred** to a
12//! future cycle. The [`KeyManager::rotate`] API is wired so an external
13//! scheduler (for example, an admin RPC handler or a timer driven from the
14//! cluster event loop) can call it directly. See `cluster.encryption`
15//! config fields in [`crate::config::NodeConfig`] for the configuration
16//! surface that already exists for when the task is added.
17
18use std::collections::BTreeMap;
19
20use crate::encryption::LogEncryptionKey;
21
22/// Monotonic version number for a [`LogEncryptionKey`].
23///
24/// Encoded into [`crate::encryption::EncryptedPayload::key_version`] so
25/// that decryption can find the right historical key. The first key has
26/// version 1; version 0 is reserved as a "legacy / unset" sentinel for
27/// any pre-rotation payload.
28pub type KeyVersion = u32;
29
30/// The legacy sentinel used by payloads that pre-date key rotation
31/// (i.e. were serialized before the `key_version` field existed).
32pub const LEGACY_KEY_VERSION: KeyVersion = 0;
33
34// ──────────────────────────────────────────────
35// KeyManager
36// ──────────────────────────────────────────────
37
38/// Manages the rolling window of [`LogEncryptionKey`]s used by an
39/// [`crate::encryption::EntryEncryptor`].
40///
41/// The "current" key is always used for encryption; on rotation, the old
42/// current key is moved into `history` and a new key takes its place.
43/// `history` is bounded by `retention` (oldest entries are dropped first
44/// once the bound is exceeded). Decryption looks up the right key by the
45/// [`KeyVersion`] embedded in the payload.
46///
47/// `retention` of `1` means only the current key is kept; rotating then
48/// immediately invalidates the previous key. `retention` of `N` means at
49/// most `N - 1` historical keys plus the current key are retained at any
50/// time (so we can decrypt entries from the most recent `N` versions).
51///
52/// `retention` is silently clamped to `>= 1` at construction time.
53pub struct KeyManager {
54 current_version: KeyVersion,
55 current: LogEncryptionKey,
56 /// Map from version → historical key (does **not** include the
57 /// current key). Bounded by `retention - 1`.
58 history: BTreeMap<KeyVersion, LogEncryptionKey>,
59 /// Maximum total versions kept (current + history); always `>= 1`.
60 retention: usize,
61}
62
63impl KeyManager {
64 /// Build a new [`KeyManager`] with `initial` as the current key at
65 /// [`KeyVersion`] `1`.
66 ///
67 /// `retention` is clamped to `>= 1`; that is, at minimum the current
68 /// key is always kept. `retention = 3` means current + 2 historical
69 /// keys are retained.
70 pub fn new(initial: LogEncryptionKey, retention: usize) -> Self {
71 let retention = retention.max(1);
72 Self {
73 current_version: 1,
74 current: initial,
75 history: BTreeMap::new(),
76 retention,
77 }
78 }
79
80 /// Rotate to a new master key, returning the new current version.
81 ///
82 /// The previous current key is moved into `history`. When the
83 /// combined size of (current + history) exceeds `retention`, the
84 /// oldest historical entry is dropped.
85 pub fn rotate(&mut self, new_key: LogEncryptionKey) -> KeyVersion {
86 // Move the current key (and its version) into history.
87 let prev_version = self.current_version;
88 let prev_key = std::mem::replace(&mut self.current, new_key);
89 self.history.insert(prev_version, prev_key);
90
91 // Advance to the new version.
92 self.current_version = self
93 .current_version
94 .checked_add(1)
95 .unwrap_or(KeyVersion::MAX);
96
97 // Bound history so total kept versions == retention. We always
98 // count the current key as one of the retained slots, so we keep
99 // at most `retention - 1` historical entries.
100 let max_history = self.retention.saturating_sub(1);
101 while self.history.len() > max_history {
102 // Drop the oldest entry (smallest version).
103 if let Some((&oldest_version, _)) = self.history.iter().next() {
104 self.history.remove(&oldest_version);
105 } else {
106 break;
107 }
108 }
109
110 self.current_version
111 }
112
113 /// The current key, paired with its version.
114 pub fn current(&self) -> (KeyVersion, &LogEncryptionKey) {
115 (self.current_version, &self.current)
116 }
117
118 /// Look up the key with `version`, falling back to historical entries.
119 ///
120 /// Returns `None` if `version` is older than the retained window
121 /// (already pruned) or has never existed.
122 pub fn lookup(&self, version: KeyVersion) -> Option<&LogEncryptionKey> {
123 if version == self.current_version {
124 return Some(&self.current);
125 }
126 self.history.get(&version)
127 }
128
129 /// Number of versions currently retained (current + history).
130 ///
131 /// Always `>= 1` because the current key is always present, so
132 /// `KeyManager` does not expose an `is_empty` method.
133 pub fn version_count(&self) -> usize {
134 1 + self.history.len()
135 }
136
137 /// Configured retention (always `>= 1`).
138 pub fn retention(&self) -> usize {
139 self.retention
140 }
141}
142
143// ──────────────────────────────────────────────
144// Tests
145// ──────────────────────────────────────────────
146
147#[cfg(test)]
148mod tests {
149 use super::*;
150
151 fn key(byte: u8) -> LogEncryptionKey {
152 LogEncryptionKey::new([byte; 32])
153 }
154
155 #[test]
156 fn test_key_manager_rotation_advances_version() {
157 let mut mgr = KeyManager::new(key(0x01), 3);
158 assert_eq!(mgr.current().0, 1, "initial current version is 1");
159
160 let v2 = mgr.rotate(key(0x02));
161 assert_eq!(v2, 2, "rotation increments to version 2");
162 assert_eq!(mgr.current().0, 2);
163
164 let v3 = mgr.rotate(key(0x03));
165 assert_eq!(v3, 3);
166 assert_eq!(mgr.current().0, 3);
167 }
168
169 #[test]
170 fn test_key_manager_lookup_returns_current_and_history() {
171 let mut mgr = KeyManager::new(key(0xaa), 3);
172 let _ = mgr.rotate(key(0xbb));
173 let _ = mgr.rotate(key(0xcc));
174
175 // Current is version 3, holding 0xcc.
176 // History holds versions 1 (0xaa) and 2 (0xbb).
177 assert!(mgr.lookup(3).is_some());
178 assert!(mgr.lookup(2).is_some());
179 assert!(mgr.lookup(1).is_some());
180 assert!(
181 mgr.lookup(99).is_none(),
182 "non-existent version returns None"
183 );
184 }
185
186 #[test]
187 fn test_key_manager_retention_drops_oldest() {
188 // retention = 2 means current + 1 historical entry max.
189 let mut mgr = KeyManager::new(key(0xaa), 2);
190 let _ = mgr.rotate(key(0xbb)); // current = v2(0xbb), history = {v1(0xaa)}
191 let _ = mgr.rotate(key(0xcc)); // current = v3(0xcc), history = {v2(0xbb)}; v1 dropped.
192
193 assert!(mgr.lookup(3).is_some(), "current v3 retained");
194 assert!(mgr.lookup(2).is_some(), "previous v2 retained");
195 assert!(mgr.lookup(1).is_none(), "oldest v1 dropped past retention");
196
197 let _ = mgr.rotate(key(0xdd)); // current = v4, history = {v3}; v2 dropped.
198 assert!(mgr.lookup(4).is_some());
199 assert!(mgr.lookup(3).is_some());
200 assert!(mgr.lookup(2).is_none());
201 }
202
203 #[test]
204 fn test_key_manager_retention_clamped_to_one() {
205 let mut mgr = KeyManager::new(key(0x10), 0); // 0 → clamped to 1.
206 assert_eq!(mgr.retention(), 1);
207
208 // With retention 1 only the current key is kept.
209 let _ = mgr.rotate(key(0x20));
210 assert!(mgr.lookup(2).is_some(), "current v2 retained");
211 assert!(mgr.lookup(1).is_none(), "v1 dropped immediately");
212 }
213
214 #[test]
215 fn test_key_manager_version_count_grows_then_caps() {
216 let mut mgr = KeyManager::new(key(0x01), 3);
217 assert_eq!(mgr.version_count(), 1, "single key after construction");
218
219 let _ = mgr.rotate(key(0x02));
220 assert_eq!(mgr.version_count(), 2);
221
222 let _ = mgr.rotate(key(0x03));
223 assert_eq!(mgr.version_count(), 3);
224
225 // Beyond retention, version_count stays capped.
226 let _ = mgr.rotate(key(0x04));
227 assert_eq!(mgr.version_count(), 3);
228
229 let _ = mgr.rotate(key(0x05));
230 assert_eq!(mgr.version_count(), 3);
231 }
232}