nodedb 0.3.0

Local-first, real-time, edge-to-cloud hybrid database for multi-modal workloads
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
// SPDX-License-Identifier: BUSL-1.1

//! Start the Raft event loop, RPC server, and both appliers.

use std::sync::{Arc, Mutex};
use std::time::Duration;

use tracing::info;

use nodedb_cluster::calvin::{
    CalvinCompletionRegistry, SEQUENCER_GROUP_ID, SequencerConfig, SequencerService,
    SequencerStateMachine, new_inbox,
};
use nodedb_cluster::distributed_array::ArrayLocalExecutor;
use nodedb_types::config::tuning::ClusterTransportTuning;

use crate::control::cluster::array_executor::DataPlaneArrayExecutor;
use crate::control::cluster::calvin::executor::ollp::OllpConfig;
use crate::control::cluster::calvin::executor::ollp::orchestrator::OllpOrchestrator;
use crate::control::cluster::calvin::{ReadResultEvent, SchedulerConfig};
use crate::control::cluster::handle::ClusterHandle;
use crate::control::cluster::metadata_applier::MetadataCommitApplier;
use crate::control::cluster::snapshot_hook::RaftSnapshotQuarantineHook;
use crate::control::cluster::spsc_applier::SpscCommitApplier;
use crate::control::cluster::start_raft_helpers::{build_vshard_handler, spawn_vshard_schedulers};
use crate::control::distributed_applier::{
    ProposeTracker, create_distributed_applier, run_apply_loop,
};
use crate::control::state::SharedState;

/// Start the Raft event loop and RPC server.
///
/// Must be called after `SharedState` is constructed (needs the WAL and
/// dispatcher for the `SpscCommitApplier`). Moves the `MultiRaft` out of
/// `handle.multi_raft` into the `RaftLoop`; must be called **exactly
/// once** per handle.
pub fn start_raft(
    handle: &ClusterHandle,
    shared: Arc<SharedState>,
    _data_dir: &std::path::Path,
    shutdown_rx: tokio::sync::watch::Receiver<bool>,
    transport_tuning: &ClusterTransportTuning,
) -> crate::Result<tokio::sync::watch::Receiver<bool>> {
    // Move the MultiRaft constructed by `start_cluster` into this
    // function. Rebuilding it here from the routing table would lose
    // learner membership for joining nodes and would double-open
    // per-group redb log files.
    let mut multi_raft = handle
        .multi_raft
        .lock()
        .unwrap_or_else(|p| p.into_inner())
        .take()
        .ok_or_else(|| crate::Error::Config {
            detail: "start_raft called twice: cluster multi_raft already consumed".into(),
        })?;

    let sequencer_peers: Vec<u64> = {
        let topo = handle.topology.read().unwrap_or_else(|p| p.into_inner());
        topo.all_nodes()
            .filter(|node| node.node_id != handle.node_id && node.state.receives_log())
            .map(|node| node.node_id)
            .collect()
    };
    multi_raft
        .add_group(SEQUENCER_GROUP_ID, sequencer_peers)
        .map_err(|e| crate::Error::Config {
            detail: format!("sequencer raft group add: {e}"),
        })?;

    // Build the propose tracker and distributed applier.
    //
    // The tracker is wired with the per-group apply watermark
    // registry so every `tracker.complete(group_id, idx, _)` call
    // also bumps the watcher — coupling the "data applied on this
    // node" signal to the single source of truth that proposers
    // and cross-node visibility waits both consume.
    let tracker =
        Arc::new(ProposeTracker::new().with_group_watchers(handle.group_watchers.clone()));
    let (dist_applier, apply_rx) = create_distributed_applier(tracker.clone());
    let dist_applier = Arc::new(dist_applier);
    let calvin_completion_registry = CalvinCompletionRegistry::new();
    let sequencer_state_machine = Arc::new(Mutex::new(SequencerStateMachine::new(
        std::collections::HashMap::new(),
        Arc::clone(&calvin_completion_registry),
    )));
    let calvin_read_result_senders = Arc::new(Mutex::new(std::collections::BTreeMap::<
        u32,
        tokio::sync::mpsc::Sender<ReadResultEvent>,
    >::new()));

    // Install the propose tracker so CP dispatch paths can await commit.
    if shared.propose_tracker.set(tracker.clone()).is_err() {
        tracing::warn!("propose_tracker already set — start_raft appears to have run twice");
    }

    let data_applier = SpscCommitApplier::new(
        shared.clone(),
        dist_applier,
        Arc::clone(&sequencer_state_machine),
    );

    // Production metadata applier: writes to the shared cache,
    // writes back to the `SystemCatalog` redb so every non-cache
    // reader observes the change, bumps the applied-index watcher,
    // broadcasts `CatalogChangeEvent`, and spawns Data Plane
    // `Register` dispatches on committed `CollectionDdl::Create`.
    let metadata_applier_concrete = Arc::new(MetadataCommitApplier::new(
        handle.metadata_cache.clone(),
        shared.catalog_change_tx.clone(),
        shared.credentials.clone(),
    ));
    // Install the Weak<SharedState> before the raft loop starts
    // ticking so no commit can reach the applier without it.
    metadata_applier_concrete.install_shared(Arc::downgrade(&shared));
    let metadata_applier: Arc<dyn nodedb_cluster::MetadataApplier> =
        metadata_applier_concrete.clone();

    // LocalPlanExecutor is the C-β physical-plan execution path (C-δ.6: sole execution path).
    let plan_executor = Arc::new(crate::control::LocalPlanExecutor::new(shared.clone()));

    // Build the real ArrayLocalExecutor that bridges incoming array shard RPCs
    // into the local Data Plane via the SPSC bridge.
    let array_executor: Arc<dyn ArrayLocalExecutor> =
        Arc::new(DataPlaneArrayExecutor::new(shared.clone()));

    let vshard_handler = build_vshard_handler(array_executor);

    let tick_interval = Duration::from_millis(transport_tuning.raft_tick_interval_ms);

    // Read snapshot-transfer config from the pending subsystem config before
    // the raft_loop is constructed (pending is consumed after the loop).
    let (snapshot_chunk_bytes, orphan_partial_max_age_secs) = {
        let guard = handle
            .pending_subsystems
            .lock()
            .unwrap_or_else(|p| p.into_inner());
        let cfg = guard.as_ref().ok_or_else(|| crate::Error::Config {
            detail: "start_raft called twice: pending_subsystems already consumed".into(),
        })?;
        (
            cfg.config.install_snapshot_chunk_bytes,
            cfg.config.orphan_partial_max_age_secs,
        )
    };

    let quarantine_hook = Arc::new(RaftSnapshotQuarantineHook {
        registry: Arc::clone(&shared.quarantine_registry),
    });

    let raft_loop = Arc::new(
        nodedb_cluster::RaftLoop::new(
            multi_raft,
            handle.transport.clone(),
            handle.topology.clone(),
            data_applier,
        )
        .with_plan_executor(plan_executor)
        .with_metadata_applier(metadata_applier)
        .with_vshard_handler(vshard_handler)
        .with_tick_interval(tick_interval)
        .with_group_watchers(handle.group_watchers.clone())
        .with_snapshot_quarantine_hook(quarantine_hook)
        .with_data_dir(_data_dir.to_path_buf())
        .with_snapshot_chunk_bytes(snapshot_chunk_bytes)
        .with_orphan_partial_max_age_secs(orphan_partial_max_age_secs),
    );

    // Spawn cluster subsystems now that the loop owns `MultiRaft`.
    // They share the same `Arc<Mutex<MultiRaft>>` the loop holds, so
    // shutdown is symmetric (subsystems are torn down before the
    // loop's strong ref drops). See `nodedb_cluster::start_cluster`
    // doc for the two-phase startup rationale.
    let pending = handle
        .pending_subsystems
        .lock()
        .unwrap_or_else(|p| p.into_inner())
        .take()
        .ok_or_else(|| crate::Error::Config {
            detail: "start_raft called twice: pending_subsystems already consumed".into(),
        })?;
    let raft_loop_handle = raft_loop.multi_raft_handle();

    let sequencer_config = SequencerConfig::default();
    let (sequencer_inbox, sequencer_inbox_rx) = new_inbox(10_000, &sequencer_config);
    let ollp_orchestrator = Arc::new(OllpOrchestrator::new(OllpConfig::default()));
    let mut sequencer_service = SequencerService::new(
        sequencer_config,
        handle.node_id,
        raft_loop_handle.clone(),
        sequencer_inbox_rx,
        sequencer_state_machine
            .lock()
            .unwrap_or_else(|p| p.into_inner())
            .next_epoch(),
        Arc::clone(&calvin_completion_registry),
    );
    let sequencer_metrics = Arc::clone(&sequencer_service.metrics);

    let scheduler_config = SchedulerConfig::default();
    spawn_vshard_schedulers(
        handle,
        &shared,
        raft_loop_handle.clone(),
        &sequencer_state_machine,
        &calvin_read_result_senders,
        &scheduler_config,
    )?;

    let running = tokio::task::block_in_place(|| {
        tokio::runtime::Handle::current().block_on(nodedb_cluster::start_cluster_subsystems(
            &pending.config,
            Arc::clone(&handle.topology),
            Arc::clone(&handle.routing),
            Arc::clone(&handle.transport),
            raft_loop_handle,
        ))
    })
    .map_err(|e| crate::Error::Config {
        detail: format!("cluster subsystem start: {e}"),
    })?;
    *handle
        .running_cluster
        .lock()
        .unwrap_or_else(|p| p.into_inner()) = Some(running);

    // Wire the Raft proposer into SharedState so CP dispatch paths
    // (pgwire, HTTP, array inbound) can route writes through Raft.
    let raft_loop_for_propose = raft_loop.clone();
    let proposer: Arc<crate::control::wal_replication::RaftProposer> =
        Arc::new(move |vshard_id, data| {
            raft_loop_for_propose
                .propose(vshard_id, data)
                .map_err(|e| crate::Error::Internal {
                    detail: format!("raft propose: {e}"),
                })
        });
    if shared.raft_proposer.set(proposer).is_err() {
        tracing::warn!("raft_proposer already set — start_raft appears to have run twice");
    }

    // Install the async proposer with transparent leader forwarding.
    //
    // Proposes via the data group leader (forwarding to a remote leader if
    // needed), then registers a ProposeTracker waiter and awaits apply.
    //
    // The ProposeTracker is race-safe: if `run_apply_loop` calls complete()
    // before register() is called (possible on fast clusters where the entry
    // commits and applies on this node before the proposer returns), the
    // result is stored and register() picks it up immediately with no timeout.
    let raft_loop_async = raft_loop.clone();
    let tracker_for_proposer = tracker.clone();
    let deadline_secs = shared.tuning.network.default_deadline_secs;
    let async_proposer: Arc<crate::control::wal_replication::AsyncRaftProposer> =
        Arc::new(move |vshard_id, idempotency_key, data| {
            let rl = raft_loop_async.clone();
            let tk = tracker_for_proposer.clone();
            Box::pin(async move {
                let (group_id, log_index) = rl
                    .propose_via_data_leader(vshard_id, data)
                    .await
                    .map_err(|e| crate::Error::Internal {
                        detail: format!("raft propose (async): {e}"),
                    })?;

                // Register the waiter with the proposer's idempotency
                // key. The apply path compares against the committed
                // entry's key so a leader-change overwrite at the same
                // (group_id, log_index) — by either an empty no-op or a
                // different proposer's real entry — surfaces as
                // `RetryableLeaderChange` instead of leaking a
                // not-our-payload back to the caller.
                let rx = tk.register(group_id, log_index, idempotency_key);
                tokio::time::timeout(std::time::Duration::from_secs(deadline_secs), rx)
                    .await
                    .map_err(|_| crate::Error::Dispatch {
                        detail: format!(
                            "raft commit timeout for group {group_id} index {log_index}"
                        ),
                    })?
                    .map_err(|_| crate::Error::Dispatch {
                        detail: "propose waiter channel closed".into(),
                    })?
                    // Preserve `RetryableLeaderChange` so the gateway
                    // retry loop can re-propose against the new leader
                    // — wrapping it in `Dispatch` would hide the
                    // retryable signal and surface as silent INSERT
                    // success. Other errors stay wrapped for
                    // diagnostics.
                    .map_err(|e| match e {
                        crate::Error::RetryableLeaderChange { .. } => e,
                        other => crate::Error::Dispatch {
                            detail: format!("apply error: {other}"),
                        },
                    })
            })
        });
    if shared.async_raft_proposer.set(async_proposer).is_err() {
        tracing::warn!("async_raft_proposer already set — start_raft appears to have run twice");
    }

    // Spawn the background apply loop. It reads from the mpsc channel
    // pushed by `DistributedApplier::apply_committed`, dispatches to the
    // Data Plane, and notifies propose waiters.
    let apply_state = shared.clone();
    let apply_tracker = tracker.clone();
    let apply_calvin_read_result_senders = Arc::clone(&calvin_read_result_senders);
    let sr_apply = shutdown_rx.clone();
    tokio::spawn(async move {
        tokio::select! {
            _ = run_apply_loop(
                apply_rx,
                apply_state,
                apply_tracker,
                apply_calvin_read_result_senders,
            ) => {}
            _ = async {
                let mut rx = sr_apply;
                let _ = rx.changed().await;
            } => {}
        }
    });

    let _ = shared.sequencer_inbox.set(sequencer_inbox);
    let _ = shared.sequencer_metrics.set(sequencer_metrics);
    let _ = shared
        .calvin_completion_registry
        .set(calvin_completion_registry);
    let _ = shared.ollp_orchestrator.set(ollp_orchestrator);

    // Publish the cluster observability handle to SharedState before
    // any listener starts serving.
    let observer = Arc::new(nodedb_cluster::ClusterObserver::new(
        handle.node_id,
        handle.lifecycle.clone(),
        handle.topology.clone(),
        handle.routing.clone(),
        raft_loop.clone() as Arc<dyn nodedb_cluster::GroupStatusProvider + Send + Sync>,
    ));
    if shared.cluster_observer.set(observer).is_err() {
        tracing::warn!("cluster_observer already set — start_raft appears to have run twice");
    }

    // Publish the raft loop handle into SharedState so the metadata
    // proposer can reach it. The handle is type-erased behind a
    // trait object to keep the SharedState field concrete.
    let proposer_handle: Arc<dyn crate::control::metadata_proposer::MetadataRaftHandle> =
        Arc::new(crate::control::metadata_proposer::RaftLoopProposerHandle::new(raft_loop.clone()));
    if shared.metadata_raft.set(proposer_handle).is_err() {
        tracing::warn!("metadata_raft already set — start_raft appears to have run twice");
    }

    // Allow the surrogate assigner's flush path to propose
    // `SurrogateAlloc` entries to the Raft group so followers advance
    // their in-memory HWM on every checkpoint.
    shared
        .surrogate_assigner
        .install_shared(Arc::downgrade(&shared));

    // Subscribe to the boot-time readiness watch BEFORE spawning the
    // tick loop so we cannot miss the first transition. The receiver
    // is returned to `main.rs`, which awaits it before binding any
    // client-facing listener.
    let ready_rx = raft_loop.subscribe_ready();

    // Register the raft-tick loop's standardized metrics so the
    // `/metrics` route can expose them alongside every other driver.
    shared
        .loop_metrics_registry
        .register(raft_loop.loop_metrics());

    // Start the Raft tick loop.
    let rl_run = raft_loop.clone();
    let sr_raft = shutdown_rx.clone();
    tokio::spawn(async move {
        rl_run.run(sr_raft).await;
        info!("raft loop stopped");
    });

    let sr_sequencer = shutdown_rx.clone();
    tokio::spawn(async move {
        sequencer_service.run(sr_sequencer).await;
        info!("sequencer service stopped");
    });

    // Start the RPC server (accepts inbound QUIC connections).
    let transport_serve = handle.transport.clone();
    let rl_handler = raft_loop.clone();
    let sr_serve = shutdown_rx.clone();
    tokio::spawn(async move {
        if let Err(e) = transport_serve.serve(rl_handler, sr_serve).await {
            tracing::error!(error = %e, "raft RPC server failed");
        }
    });

    // Wire version of every node is now carried on the live
    // `NodeInfo` in `cluster_topology`. Log the derived view for observability.
    {
        let view = shared.cluster_version_view();
        let compat = crate::control::rolling_upgrade::should_compat_mode(&view);
        info!(
            node_id = handle.node_id,
            nodes = view.node_count,
            min_version = view.min_version,
            max_version = view.max_version,
            mixed = view.is_mixed_version(),
            compat_mode = compat,
            "cluster version view derived from topology"
        );
    }

    // Start the health monitor (periodic pings, failure detection,
    // topology re-broadcast).
    let health_config = nodedb_cluster::HealthConfig {
        ping_interval: Duration::from_secs(transport_tuning.health_ping_interval_secs),
        failure_threshold: transport_tuning.health_failure_threshold,
    };
    let health_monitor = Arc::new(nodedb_cluster::HealthMonitor::new(
        handle.node_id,
        handle.transport.clone(),
        handle.topology.clone(),
        handle.catalog.clone(),
        health_config,
    ));
    shared
        .loop_metrics_registry
        .register(health_monitor.loop_metrics());
    if shared.health_monitor.set(health_monitor.clone()).is_err() {
        tracing::warn!("health_monitor already set — start_raft appears to have run twice");
    }
    let sr_health = shutdown_rx;
    tokio::spawn(async move {
        health_monitor.run(sr_health).await;
    });

    info!(node_id = handle.node_id, "raft loop and RPC server started");

    Ok(ready_rx)
}