nodedb 0.3.0

Local-first, real-time, edge-to-cloud hybrid database for multi-modal workloads
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
// SPDX-License-Identifier: BUSL-1.1

//! Production metadata-group commit applier.
//!
//! Single branch for DDL: decode the opaque `CatalogDdl { payload }`
//! as a host-side [`CatalogEntry`], write through to `SystemCatalog`
//! redb via [`catalog_entry::apply_to`], and spawn the post-apply
//! side effects (Data Plane register, sequence registry sync, etc.).
//! All 16 per-DDL-object types are handled by adding a variant to
//! `CatalogEntry` — nothing in this file changes per type.
//!
//! The applier broadcasts `CatalogChangeEvent` (for future
//! prepared-statement / catalog cache invalidation). The per-group
//! apply watermark is maintained by the Raft tick loop directly via
//! [`nodedb_cluster::GroupAppliedWatchers`] — the applier no longer
//! owns its own watcher because that primitive is now keyed by
//! `group_id` and shared across every Raft group on the node.

use std::sync::{Arc, OnceLock, RwLock, Weak};

use tokio::sync::broadcast;
use tracing::{debug, warn};

use nodedb_cluster::{MetadataApplier, MetadataCache, MetadataEntry, decode_entry};

use crate::control::catalog_entry;
use crate::control::security::credential::CredentialStore;
use crate::control::state::SharedState;

use metadata_applier_audit::{apply_ca_trust_change, emit_ddl_audit};

#[path = "metadata_applier_audit.rs"]
mod metadata_applier_audit;

/// Broadcast channel capacity — small, because consumers are
/// internal subsystems that keep up or are lagged intentionally.
pub const CATALOG_CHANNEL_CAPACITY: usize = 64;

/// Event published on every committed metadata entry.
#[derive(Debug, Clone)]
pub struct CatalogChangeEvent {
    pub applied_index: u64,
}

/// Production `MetadataApplier` installed on the `RaftLoop`.
pub struct MetadataCommitApplier {
    cache: Arc<RwLock<MetadataCache>>,
    catalog_change_tx: broadcast::Sender<CatalogChangeEvent>,
    credentials: Arc<CredentialStore>,
    /// Weak handle to `SharedState`. Installed by `start_raft` after
    /// construction so the applier can spawn async post-apply side
    /// effects (Data Plane register on `PutCollection`,
    /// `sequence_registry.create` on `PutSequence`, etc.). Weak to
    /// break the Arc cycle (SharedState → raft loop → applier →
    /// SharedState). `None` in unit tests.
    shared: OnceLock<Weak<SharedState>>,
}

impl MetadataCommitApplier {
    pub fn new(
        cache: Arc<RwLock<MetadataCache>>,
        catalog_change_tx: broadcast::Sender<CatalogChangeEvent>,
        credentials: Arc<CredentialStore>,
    ) -> Self {
        Self {
            cache,
            catalog_change_tx,
            credentials,
            shared: OnceLock::new(),
        }
    }

    /// Install a weak handle to `SharedState` so the applier can
    /// spawn post-apply side effects. Must be called **before** the
    /// raft loop starts ticking; `start_raft` does this as part of
    /// its construction sequence.
    pub fn install_shared(&self, shared: Weak<SharedState>) {
        let _ = self.shared.set(shared);
    }

    /// Apply a single decoded `MetadataEntry`'s host-side effects.
    ///
    /// - `CatalogDdl` → decode payload as `CatalogEntry`, write
    ///   through to redb via `catalog_entry::apply_to`, spawn async
    ///   post-apply side effects if `SharedState` is reachable.
    /// - Non-DDL variants (topology, routing, lease, version) have
    ///   no host-side redb effects in this crate — the cluster crate
    ///   already tracks them in the `MetadataCache`.
    fn apply_host_side_effects(&self, entry: &MetadataEntry, raft_index: u64) {
        // Atomic batches unpack one level: the sub-entries are
        // applied individually so each gets its own audit record
        // stamped with the same raft_index (they committed at the
        // same log position).
        if let MetadataEntry::Batch { entries } = entry {
            for sub in entries {
                self.apply_host_side_effects(sub, raft_index);
            }
            return;
        }

        // Handle non-CatalogDdl variants that still have host-side
        // effects. Drain start/end land on `shared.lease_drain` on
        // every node so the next `force_refresh_lease` check sees
        // the replicated drain state.
        match entry {
            MetadataEntry::DescriptorDrainStart {
                descriptor_id,
                up_to_version,
                expires_at,
            } => {
                if let Some(weak) = self.shared.get()
                    && let Some(shared) = weak.upgrade()
                {
                    shared.lease_drain.install_start(
                        descriptor_id.clone(),
                        *up_to_version,
                        *expires_at,
                    );
                    debug!(
                        descriptor = ?descriptor_id,
                        up_to_version,
                        "drain_start applied to host tracker"
                    );
                }
                return;
            }
            MetadataEntry::DescriptorDrainEnd { descriptor_id } => {
                if let Some(weak) = self.shared.get()
                    && let Some(shared) = weak.upgrade()
                {
                    shared.lease_drain.install_end(descriptor_id);
                    debug!(
                        descriptor = ?descriptor_id,
                        "drain_end applied to host tracker"
                    );
                }
                return;
            }
            MetadataEntry::CaTrustChange {
                add_ca_cert,
                remove_ca_fingerprint,
            } => {
                if let Some(weak) = self.shared.get()
                    && let Some(shared) = weak.upgrade()
                {
                    apply_ca_trust_change(
                        &shared,
                        add_ca_cert.as_deref(),
                        remove_ca_fingerprint.as_ref(),
                        raft_index,
                    );
                }
                return;
            }
            MetadataEntry::SurrogateAlloc { hwm } => {
                // Advance the in-memory surrogate high-watermark on every
                // node. `restore_hwm` is idempotent and monotonic: calling
                // it with a value at or below the current HWM is a no-op,
                // so duplicate or reordered delivery cannot push the
                // counter backwards. Also persist the hwm to the catalog so
                // the local node survives a restart without re-reading the
                // full log.
                if let Some(weak) = self.shared.get()
                    && let Some(shared) = weak.upgrade()
                {
                    let reg = shared
                        .surrogate_assigner
                        .registry_handle()
                        .read()
                        .unwrap_or_else(|p| p.into_inner());
                    if let Err(e) = reg.restore_hwm(*hwm) {
                        warn!(hwm, error = %e, "surrogate_alloc apply: restore_hwm failed");
                    }
                    drop(reg);
                    // Best-effort catalog persist: a failure means the
                    // next restart will re-derive the HWM from the log,
                    // which is correct — just slightly slower. We must
                    // not block the apply loop on catalog I/O.
                    if let Some(catalog) = self.credentials.catalog()
                        && let Err(e) = catalog.put_surrogate_hwm(*hwm)
                    {
                        warn!(
                            hwm,
                            error = %e,
                            "surrogate_alloc apply: failed to persist hwm to catalog"
                        );
                    }
                    debug!(hwm, raft_index, "surrogate hwm advanced via raft");
                }
                return;
            }
            _ => {}
        }

        let Some(catalog) = self.credentials.catalog() else {
            return;
        };
        let (payload, audit) = match entry {
            MetadataEntry::CatalogDdl { payload } => (payload, None),
            MetadataEntry::CatalogDdlAudited {
                payload,
                auth_user_id,
                auth_user_name,
                sql_text,
            } => (
                payload,
                Some((
                    auth_user_id.clone(),
                    auth_user_name.clone(),
                    sql_text.clone(),
                )),
            ),
            _ => return,
        };
        let catalog_entry = match catalog_entry::decode(payload) {
            Ok(e) => e,
            Err(e) => {
                warn!(error = %e, "metadata applier: failed to decode CatalogEntry payload");
                return;
            }
        };

        // Stamp `descriptor_version` + `modification_hlc` for every
        // `Put*` variant that carries them. Gated on the rolling
        // upgrade flag — in mixed-version clusters the older nodes
        // do not have the stamp logic, so we leave the entry's
        // sentinel `0` / `Hlc::ZERO` in place and let resolvers
        // treat it as "unknown, always re-fetch". Only the proposing
        // node's apply path observes the gate; followers run the
        // same applier and will reach the same conclusion because
        // every node observes the same live topology (same
        // `wire_version` on every NodeInfo, replicated via the
        // gossip path).
        let stamped = if let Some(weak) = self.shared.get()
            && let Some(shared) = weak.upgrade()
        {
            let compat = !shared.cluster_version_view().can_activate_feature(
                crate::control::rolling_upgrade::DESCRIPTOR_VERSIONING_VERSION,
            );
            if compat {
                catalog_entry
            } else {
                catalog_entry::descriptor_stamp::stamp(catalog_entry, &shared.hlc_clock, catalog)
            }
        } else {
            // Unit tests construct the applier without a SharedState.
            // They don't care about descriptor versioning — leave
            // the entry unstamped so existing assertions on
            // `descriptor_version == 0` still hold.
            catalog_entry
        };

        debug!(kind = stamped.kind(), "catalog_entry: applying to redb");
        catalog_entry::apply::apply_to(&stamped, catalog);
        // Implicit drain clear: if the entry is a `Put*` for one
        // of the six stamped descriptor types, the DDL that was
        // waiting on drain has now committed — remove the drain
        // entry from every node's host tracker. Happens before
        // post_apply so a subsequent `acquire_lease` fired from
        // post_apply doesn't see a stale drain.
        if let Some(weak) = self.shared.get()
            && let Some(shared) = weak.upgrade()
        {
            if let Some(drained_id) =
                crate::control::lease::drain_propose::descriptor_id_for_implicit_clear(&stamped)
            {
                shared.lease_drain.install_end(&drained_id);
            }
            // Run synchronous post-apply side effects INLINE so every
            // in-memory cache update (install_replicated_user,
            // install_replicated_owner, etc.) is visible before the
            // watcher bump. Any reader that observes `applied_index`
            // moving past `last` is guaranteed to see the sync side
            // effects of every entry up to `last`.
            //
            // `PutCollection` Register dispatch runs synchronously
            // (block_in_place) inside spawn_post_apply_async_side_effects
            // and IS part of the applied-index contract: the watcher
            // only bumps after doc_configs is populated on every core,
            // so subsequent scans always find the schema.
            catalog_entry::post_apply::apply_post_apply_side_effects_sync(&stamped, &shared);

            // Emit a DdlChange audit record on every replica.
            // Executed BEFORE spawning async post-apply side effects
            // so the audit entry lands synchronously with the rest of
            // the commit.
            emit_ddl_audit(&shared, raft_index, &stamped, audit.as_ref());

            catalog_entry::post_apply::spawn_post_apply_async_side_effects(
                stamped, shared, raft_index,
            );
        }
    }
}

impl MetadataApplier for MetadataCommitApplier {
    fn apply(&self, entries: &[(u64, Vec<u8>)]) -> u64 {
        let mut last = 0u64;
        for (index, data) in entries {
            last = *index;
            if data.is_empty() {
                continue;
            }
            let entry = match decode_entry(data) {
                Ok(e) => e,
                Err(e) => {
                    warn!(index = *index, error = %e, "metadata decode failed");
                    continue;
                }
            };
            // 1. Cluster-owned cache state (topology, routing,
            //    leases, catalog_entries_applied counter).
            {
                let mut guard = self.cache.write().unwrap_or_else(|p| p.into_inner());
                guard.apply(*index, &entry);
            }
            // 2. Host side effects (redb writeback + async post-apply).
            self.apply_host_side_effects(&entry, *index);
        }
        if last > 0 {
            // The Raft tick loop bumps the per-group apply watcher
            // directly after `advance_applied`; this applier only
            // owns the catalog-change broadcast.
            let _ = self.catalog_change_tx.send(CatalogChangeEvent {
                applied_index: last,
            });
            debug!(
                applied_index = last,
                "metadata applier broadcast catalog-change event"
            );
        }
        last
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use nodedb_types::DatabaseId;

    use crate::control::catalog_entry::CatalogEntry;
    use crate::control::security::catalog::StoredCollection;
    use nodedb_cluster::encode_entry;

    fn make_applier() -> (
        MetadataCommitApplier,
        Arc<RwLock<MetadataCache>>,
        Arc<CredentialStore>,
        tempfile::TempDir,
    ) {
        let tmp = tempfile::tempdir().expect("tmpdir");
        let credentials =
            Arc::new(CredentialStore::open(&tmp.path().join("system.redb")).expect("open"));
        let cache = Arc::new(RwLock::new(MetadataCache::new()));
        let (tx, _rx) = broadcast::channel(16);
        let applier = MetadataCommitApplier::new(cache.clone(), tx, credentials.clone());
        (applier, cache, credentials, tmp)
    }

    fn put_collection_entry(name: &str) -> MetadataEntry {
        let stored = StoredCollection::new(7, name, "tester");
        let catalog_entry = CatalogEntry::PutCollection(Box::new(stored));
        MetadataEntry::CatalogDdl {
            payload: catalog_entry::encode(&catalog_entry).unwrap(),
        }
    }

    #[test]
    fn apply_put_collection_writes_through_to_redb() {
        let (applier, cache, credentials, _tmp) = make_applier();
        let bytes = encode_entry(&put_collection_entry("orders")).unwrap();
        assert_eq!(applier.apply(&[(11, bytes)]), 11);

        let cache_guard = cache.read().unwrap();
        assert_eq!(cache_guard.applied_index, 11);
        assert_eq!(cache_guard.catalog_entries_applied, 1);
        drop(cache_guard);

        let loaded = credentials
            .catalog()
            .as_ref()
            .unwrap()
            .get_collection(DatabaseId::DEFAULT, 7, "orders")
            .unwrap()
            .expect("present");
        assert_eq!(loaded.name, "orders");
        assert_eq!(loaded.owner, "tester");
    }

    #[test]
    fn apply_deactivate_preserves_record() {
        let (applier, _cache, credentials, _tmp) = make_applier();

        // Seed.
        applier.apply(&[(1, encode_entry(&put_collection_entry("archived")).unwrap())]);

        let drop_entry = MetadataEntry::CatalogDdl {
            payload: catalog_entry::encode(&CatalogEntry::DeactivateCollection {
                tenant_id: 7,
                name: "archived".into(),
            })
            .unwrap(),
        };
        applier.apply(&[(2, encode_entry(&drop_entry).unwrap())]);

        let loaded = credentials
            .catalog()
            .as_ref()
            .unwrap()
            .get_collection(DatabaseId::DEFAULT, 7, "archived")
            .unwrap()
            .expect("preserved");
        assert!(!loaded.is_active);
    }

    #[test]
    fn apply_empty_batch_is_noop() {
        let (applier, _cache, _credentials, _tmp) = make_applier();
        assert_eq!(applier.apply(&[]), 0);
    }
}