Skip to main content

nodedb_cluster/metadata_group/
cache.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Per-node in-memory view of the replicated metadata state.
4//!
5//! The cache tracks everything `nodedb-cluster` natively understands:
6//! the applied raft index, the HLC watermark, topology / routing
7//! change history, descriptor leases, and cluster version. It
8//! **does not** maintain per-DDL-object descriptor state — that
9//! lives on the host side via
10//! `nodedb::control::catalog_entry::CatalogEntry::apply_to` writing
11//! into `SystemCatalog` redb. The `CatalogDdl { payload }` variant
12//! is opaque here: the cache tracks its applied index and forwards
13//! the payload to the host's `MetadataCommitApplier`.
14
15use std::collections::HashMap;
16
17use nodedb_types::Hlc;
18use tracing::{debug, info, warn};
19
20use crate::error::{ClusterError, MigrationCheckpointError};
21use crate::metadata_group::compensation::Compensation;
22use crate::metadata_group::descriptors::{DescriptorId, DescriptorLease};
23use crate::metadata_group::entry::{MetadataEntry, RoutingChange, TopologyChange};
24use crate::metadata_group::migration_state::{
25    MigrationPhaseTag, PersistedMigrationCheckpoint, SharedMigrationStateTable,
26};
27use crate::routing::RoutingTable;
28
29/// In-memory view of the committed metadata state.
30#[derive(Debug, Default)]
31pub struct MetadataCache {
32    pub applied_index: u64,
33    pub last_applied_hlc: Hlc,
34
35    /// `(descriptor_id, node_id) -> lease`.
36    pub leases: HashMap<(DescriptorId, u64), DescriptorLease>,
37
38    /// Topology mutations applied so far.
39    pub topology_log: Vec<TopologyChange>,
40    pub routing_log: Vec<RoutingChange>,
41
42    pub cluster_version: u16,
43
44    /// Monotonically-increasing count of committed `CatalogDdl`
45    /// entries. Exposed for tests and metrics — planners read
46    /// catalog state through the host-side `SystemCatalog`, not
47    /// this counter.
48    pub catalog_entries_applied: u64,
49}
50
51impl MetadataCache {
52    pub fn new() -> Self {
53        Self::default()
54    }
55
56    /// Apply a committed entry. Idempotent by `applied_index`:
57    /// entries at or below the current watermark are ignored.
58    pub fn apply(&mut self, index: u64, entry: &MetadataEntry) {
59        if index != 0 && index <= self.applied_index {
60            debug!(
61                index,
62                watermark = self.applied_index,
63                "metadata cache: skipping already-applied entry"
64            );
65            return;
66        }
67        self.applied_index = index;
68
69        match entry {
70            MetadataEntry::CatalogDdl { payload: _ }
71            | MetadataEntry::CatalogDdlAudited { payload: _, .. } => {
72                // Opaque to the cluster crate. The host-side applier
73                // decodes the payload and writes through to
74                // `SystemCatalog`. We just count it — both DDL
75                // shapes contribute to `catalog_entries_applied`.
76                self.catalog_entries_applied += 1;
77            }
78            MetadataEntry::TopologyChange(change) => self.topology_log.push(change.clone()),
79            MetadataEntry::RoutingChange(change) => self.routing_log.push(change.clone()),
80
81            MetadataEntry::ClusterVersionBump { from, to } => {
82                if *from != self.cluster_version && self.cluster_version != 0 {
83                    warn!(
84                        expected = self.cluster_version,
85                        got = *from,
86                        "cluster version bump mismatch"
87                    );
88                }
89                self.cluster_version = *to;
90            }
91
92            MetadataEntry::DescriptorLeaseGrant(lease) => {
93                if lease.expires_at > self.last_applied_hlc {
94                    self.last_applied_hlc = lease.expires_at;
95                }
96                self.leases
97                    .insert((lease.descriptor_id.clone(), lease.node_id), lease.clone());
98            }
99            MetadataEntry::DescriptorLeaseRelease {
100                node_id,
101                descriptor_ids,
102            } => {
103                for id in descriptor_ids {
104                    self.leases.remove(&(id.clone(), *node_id));
105                }
106            }
107            // Drain state is host-side (lives in
108            // `nodedb::control::lease::DescriptorDrainTracker`);
109            // the cluster-side cache only tracks lease state
110            // directly. These no-op arms keep the exhaustive
111            // match coverage so adding new variants is a
112            // compile-time error here too.
113            MetadataEntry::DescriptorDrainStart { expires_at, .. } => {
114                if *expires_at > self.last_applied_hlc {
115                    self.last_applied_hlc = *expires_at;
116                }
117            }
118            MetadataEntry::DescriptorDrainEnd { .. } => {}
119            MetadataEntry::CaTrustChange { .. } => {
120                // CA trust mutations are host-side only: the production
121                // applier in the nodedb crate writes/deletes
122                // `tls/ca.d/<fp>.crt` and rebuilds the rustls config.
123                // Cluster cache has nothing to track.
124            }
125            MetadataEntry::SurrogateAlloc { .. } => {
126                // Surrogate HWM advance is host-side only: the production
127                // applier calls `SurrogateRegistry::restore_hwm`. The
128                // cluster cache has no surrogate state to track.
129            }
130            MetadataEntry::JoinTokenTransition { .. } => {
131                // Token lifecycle transitions are enforced by the bootstrap-
132                // listener handler at apply time. The cluster cache records
133                // the applied index but carries no per-token state — the
134                // host-side token store is authoritative.
135            }
136            MetadataEntry::Batch { entries } => {
137                for sub in entries {
138                    self.apply(index, sub);
139                }
140            }
141            // Migration checkpoint/abort upserts are handled by the
142            // live-state applier (`CacheApplier`) which holds the
143            // `SharedMigrationStateTable` handle. The cache itself
144            // has no migration state to track beyond applied_index.
145            MetadataEntry::MigrationCheckpoint { .. } => {}
146            MetadataEntry::MigrationAbort { .. } => {}
147        }
148    }
149}
150
151// ── Migration live-state application helpers ─────────────────────────────────
152
153/// Apply a `MigrationCheckpoint` entry against the shared state table.
154///
155/// Returns `Err` on CRC32C mismatch (fatal — corruption is louder than data
156/// loss) and on storage failures. Idempotent: if
157/// `(migration_id, phase, attempt)` is already present, the call is a no-op.
158pub fn apply_migration_checkpoint(
159    table: &SharedMigrationStateTable,
160    migration_id: uuid::Uuid,
161    _phase: MigrationPhaseTag,
162    attempt: u32,
163    payload: crate::metadata_group::migration_state::MigrationCheckpointPayload,
164    expected_crc: u32,
165    ts_ms: u64,
166) -> Result<(), ClusterError> {
167    // Validate CRC32C against the encoded payload bytes.
168    let actual_crc = payload.crc32c()?;
169    if actual_crc != expected_crc {
170        return Err(ClusterError::MigrationCheckpoint(
171            MigrationCheckpointError::Crc32cMismatch {
172                migration_id,
173                expected: expected_crc,
174                actual: actual_crc,
175            },
176        ));
177    }
178
179    let row = PersistedMigrationCheckpoint {
180        migration_id: migration_id.hyphenated().to_string(),
181        attempt,
182        payload,
183        crc32c: actual_crc,
184        ts_ms,
185    };
186
187    let mut guard = table.lock().unwrap_or_else(|p| p.into_inner());
188    guard.upsert(row)
189}
190
191/// Apply a `MigrationAbort` entry against the shared state table and
192/// live routing table.
193///
194/// Applies each compensation in order; any failure is fatal (no
195/// warn-and-continue — a partial abort is as broken as a partial commit).
196/// On success, removes the migration row from the state table.
197pub fn apply_migration_abort(
198    table: &SharedMigrationStateTable,
199    routing: Option<&std::sync::Arc<std::sync::RwLock<RoutingTable>>>,
200    migration_id: uuid::Uuid,
201    reason: &str,
202    compensations: &[Compensation],
203) -> Result<(), ClusterError> {
204    info!(
205        migration_id = %migration_id,
206        reason,
207        steps = compensations.len(),
208        "applying migration abort"
209    );
210
211    for (step, comp) in compensations.iter().enumerate() {
212        apply_compensation(routing, step, migration_id, comp)?;
213    }
214
215    let mut guard = table.lock().unwrap_or_else(|p| p.into_inner());
216    guard.remove(&migration_id)
217}
218
219fn apply_compensation(
220    routing: Option<&std::sync::Arc<std::sync::RwLock<RoutingTable>>>,
221    step: usize,
222    migration_id: uuid::Uuid,
223    comp: &Compensation,
224) -> Result<(), ClusterError> {
225    let Some(live) = routing else {
226        // No routing handle attached — log and treat as success.
227        // This path is only reachable in unit tests without live state.
228        debug!(
229            migration_id = %migration_id,
230            step,
231            ?comp,
232            "compensation: no live routing handle, skipping"
233        );
234        return Ok(());
235    };
236
237    let mut rt = live.write().unwrap_or_else(|p| p.into_inner());
238    match comp {
239        Compensation::RemoveLearner { group_id, peer_id }
240        | Compensation::RemoveVoter { group_id, peer_id } => {
241            rt.remove_group_member(*group_id, *peer_id);
242        }
243        Compensation::RestoreLeaderHint { group_id, peer_id } => {
244            rt.set_leader(*group_id, *peer_id);
245        }
246        Compensation::RemoveGhostStub { vshard_id: _ } => {
247            // Ghost stub removal is handled by the caller's ghost_table;
248            // the routing table has no ghost state.
249        }
250    }
251    drop(rt);
252
253    debug!(
254        migration_id = %migration_id,
255        step,
256        ?comp,
257        "compensation applied"
258    );
259    Ok(())
260}