1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
// SPDX-License-Identifier: BUSL-1.1
//! `CoreLoop` struct definition — all fields for the per-core Data Plane loop.
use std::collections::HashMap;
use std::sync::Arc;
use nodedb_bridge::buffer::{Consumer, Producer};
use crate::bridge::dispatch::{BridgeRequest, BridgeResponse};
use crate::control::array_catalog::ArrayCatalogHandle;
use crate::data::io::IoMetrics;
use crate::engine::array::ArrayEngine;
use crate::engine::crdt::tenant_state::TenantCrdtEngine;
use crate::engine::graph::edge_store::EdgeStore;
use crate::engine::sparse::btree::SparseEngine;
use crate::engine::sparse::doc_cache::DocCache;
use crate::engine::sparse::inverted::InvertedIndex;
use crate::engine::vector::collection::VectorCollection;
use crate::engine::vector::sparse::SparseInvertedIndex;
use crate::types::{Lsn, TenantId};
use nodedb_graph::ShardedCsrIndex;
use nodedb_types::{DatabaseId, OrdinalClock};
use super::priority_queues::PriorityQueues;
/// Per-core event loop for the Data Plane.
///
/// Each CPU core runs one `CoreLoop`. It owns:
/// - SPSC consumer for incoming requests from the Control Plane
/// - SPSC producer for outgoing responses to the Control Plane
/// - Per-core `SparseEngine` (redb) for point lookups and range scans
/// - Per-tenant `TenantCrdtEngine` instances (lazy-initialized)
/// - Task queue for pending execution
///
/// This type is intentionally `!Send` — pinned to a single core.
pub struct CoreLoop {
pub(in crate::data::executor) core_id: usize,
/// SPSC channel: receives requests from Control Plane.
pub(in crate::data::executor) request_rx: Consumer<BridgeRequest>,
/// SPSC channel: sends responses to Control Plane.
pub(crate) response_tx: Producer<BridgeResponse>,
/// Three-tier priority task queue (Critical / High / Low).
///
/// Drain budget per 14-slot cycle: 8 Critical : 4 High : 2 Low.
/// Empty tiers donate unused slots to the next lower tier.
pub(crate) task_queue: PriorityQueues,
/// Position within the current 14-slot drain cycle.
/// Passed by mutable reference to `PriorityQueues::pop_next` so the
/// ratio is maintained across multiple calls inside a single `tick()`.
pub(crate) drain_cycle: usize,
/// Per-priority IO queue-depth and wait-latency metrics.
///
/// Shared via `Arc` with the Control Plane Prometheus handler so the
/// HTTP endpoint can read live values without crossing the plane boundary
/// through `SystemMetrics`.
pub(crate) io_metrics: Arc<IoMetrics>,
/// Current watermark LSN for this core's shard data.
pub(crate) watermark: Lsn,
/// redb-backed sparse/metadata engine for this core.
pub(crate) sparse: SparseEngine,
/// Per-tenant CRDT engines, lazily initialized on first access.
pub(in crate::data::executor) crdt_engines: HashMap<TenantId, TenantCrdtEngine>,
/// Per-collection vector collections, lazily initialized on first insert.
/// Key: `(TenantId, collection_key)` where `collection_key` is `collection`
/// or `"{collection}:{field_name}"` for named fields.
pub(in crate::data::executor) vector_collections: HashMap<(TenantId, String), VectorCollection>,
/// Background HNSW builder: send requests.
pub(in crate::data::executor) build_tx: Option<crate::engine::vector::builder::BuildSender>,
/// Background HNSW builder: receive completed builds.
pub(in crate::data::executor) build_rx:
Option<crate::engine::vector::builder::CompleteReceiver>,
/// Per-collection HNSW parameters set via DDL. If a collection has no
/// entry here, `HnswParams::default()` is used on first insert.
/// Key: `(TenantId, collection_key)` — same shape as `vector_collections`.
pub(in crate::data::executor) vector_params:
HashMap<(TenantId, String), crate::engine::vector::hnsw::HnswParams>,
/// redb-backed graph edge storage for this core.
pub(in crate::data::executor) edge_store: EdgeStore,
/// Strictly-monotonic ordinal clock for bitemporal `system_from` suffixes.
/// Shared across all Data Plane cores so edge keys are globally ordered
/// even under concurrent multi-core writes.
pub(in crate::data::executor) hlc: Arc<OrdinalClock>,
/// HLC watermark for `_ts_system` stamping (see `bitemporal_time.rs`).
pub(in crate::data::executor) last_stamp_ms: std::sync::atomic::AtomicI64,
/// Per-tenant in-memory CSR adjacency index, rebuilt from
/// edge_store on startup. Each tenant's graph state lives in its
/// own `CsrIndex` partition — no shared key space, no lexical
/// `<tid>:` prefix anywhere in memory.
pub(in crate::data::executor) csr: ShardedCsrIndex,
/// Full-text inverted index (BM25), shares redb with sparse engine.
pub(in crate::data::executor) inverted: InvertedIndex,
/// Per-collection spatial R-tree indexes, keyed by (TenantId, collection, field).
/// Lazily initialized when a spatial query or geometry insert first targets a field.
pub(in crate::data::executor) spatial_indexes:
std::collections::HashMap<(TenantId, String, String), crate::engine::spatial::RTree>,
/// Reverse map from R-tree entry ID → document ID,
/// keyed by (TenantId, collection, field, entry_id).
pub(in crate::data::executor) spatial_doc_map:
std::collections::HashMap<(TenantId, String, String, u64), String>,
/// Base data directory for this core (used for sort spill temp files).
pub(in crate::data::executor) data_dir: std::path::PathBuf,
/// vShards that are paused for write operations (during Phase 3 migration cutover).
pub(in crate::data::executor) paused_vshards: std::collections::HashSet<crate::types::VShardId>,
/// Nodes that have been explicitly deleted via PointDelete cascade,
/// keyed per-tenant. Used for edge referential integrity —
/// `EdgePut` to a deleted node is rejected with
/// `RejectedDanglingEdge`. Cleared periodically or on compaction.
///
/// Stored as `HashMap<TenantId, HashSet<UnscopedNodeName>>`: one
/// set per tenant, entries are raw user-visible names. This is the
/// last piece of state in `CoreLoop` that used to live as a flat
/// scoped-string tracker; it's now structurally tenant-partitioned
/// like every other graph concern.
pub(in crate::data::executor) deleted_nodes:
HashMap<TenantId, std::collections::HashSet<String>>,
/// Idempotency key deduplication: maps processed idempotency keys to
/// whether they succeeded (true) or failed (false). Uses `VecDeque`
/// for FIFO eviction order alongside `HashMap` for O(1) lookup.
/// Bounded to 16,384 entries.
pub(in crate::data::executor) idempotency_cache: HashMap<u64, bool>,
/// FIFO order of idempotency keys for correct eviction (oldest first).
pub(in crate::data::executor) idempotency_order: std::collections::VecDeque<u64>,
/// Column statistics store for CBO. Shares redb with sparse engine.
/// Updated incrementally on PointPut. Read by DataFusion optimizer.
pub(in crate::data::executor) stats_store: crate::engine::sparse::stats::StatsStore,
/// Incremental aggregate cache: maps `(tenant, rest)` →
/// partial aggregate state. Updated on writes (PointPut increments counts/sums),
/// cleared on schema change. Turns O(N) full-scan aggregates into O(1) cache
/// lookups for repeated dashboard/analytics queries.
///
/// Key: `(TenantId, "{collection}\0{group_by_fields}\0{agg_ops}")`.
/// Value: cached result rows as JSON.
pub(in crate::data::executor) aggregate_cache: HashMap<(TenantId, String), Vec<u8>>,
/// Last time periodic maintenance (compaction, edge sweep) was run.
pub(in crate::data::executor) last_maintenance: Option<std::time::Instant>,
/// Per-collection full index config (includes index_type, PQ params, IVF params).
/// Stored alongside vector_params for collections that use non-default index types.
/// Key: `(TenantId, collection_key)` — same shape as `vector_collections`.
pub(in crate::data::executor) index_configs:
HashMap<(TenantId, String), crate::engine::vector::index_config::IndexConfig>,
/// IVF-PQ indexes for collections configured with `index_type = "ivf_pq"`.
/// Key: `(TenantId, collection_key)` — same shape as `vector_collections`.
pub(in crate::data::executor) ivf_indexes:
HashMap<(TenantId, String), crate::engine::vector::ivf::IvfPqIndex>,
/// Per-collection sparse vector inverted indexes, keyed by (TenantId, collection, field).
/// The field is `"_sparse"` when no named field is specified.
pub(in crate::data::executor) sparse_vector_indexes:
HashMap<(TenantId, String, String), SparseInvertedIndex>,
/// Compaction interval (how often `maybe_run_maintenance` triggers).
pub(in crate::data::executor) compaction_interval: std::time::Duration,
/// Tombstone ratio threshold for auto-compaction (0.0–1.0).
pub(in crate::data::executor) compaction_tombstone_threshold: f64,
/// Per-core LRU document cache for O(1) hot-key point lookups.
/// Invalidated write-through on PointPut/Delete/Update.
pub(in crate::data::executor) doc_cache: DocCache,
/// Per-collection columnar timeseries memtables (!Send, per-core owned).
/// Key: (TenantId, collection).
pub(in crate::data::executor) columnar_memtables:
HashMap<(TenantId, String), crate::engine::timeseries::columnar_memtable::ColumnarMemtable>,
/// Per-collection columnar mutation engines for plain/spatial profiles.
/// Uses `nodedb-columnar`'s `MutationEngine` with full INSERT/UPDATE/DELETE.
/// Key: (TenantId, collection).
pub(in crate::data::executor) columnar_engines:
HashMap<(TenantId, String), nodedb_columnar::MutationEngine>,
/// Flushed columnar segment bytes, keyed by (TenantId, collection).
/// Each entry is a list of encoded segment buffers produced by `SegmentWriter`.
/// Kept in memory so `scan_columnar` can read rows that were drained from the
/// active memtable during a flush (otherwise those rows would be lost until a
/// real on-disk segment reader is wired up).
pub(in crate::data::executor) columnar_flushed_segments:
HashMap<(TenantId, String), Vec<Vec<u8>>>,
/// Per-collection max WAL LSN that has been ingested into the memtable.
/// Used by the WAL catch-up deduplication: if a catch-up record's LSN
/// is <= this value, the Data Plane skips it (already ingested).
/// Key: (TenantId, collection).
pub(in crate::data::executor) ts_max_ingested_lsn: HashMap<(TenantId, String), u64>,
/// Last time any timeseries ingest was processed on this core.
/// Used by idle flush: if no ingest for 5 seconds, `maybe_run_maintenance`
/// flushes all non-empty memtables to disk partitions.
pub(in crate::data::executor) last_ts_ingest: Option<std::time::Instant>,
/// Per-collection last-value caches for O(1) recent value lookup.
/// Key: (TenantId, collection).
pub(in crate::data::executor) ts_last_value_caches:
HashMap<(TenantId, String), crate::engine::timeseries::last_value_cache::LastValueCache>,
/// Per-collection timeseries partition registries for this core.
/// Key: (TenantId, collection).
pub(in crate::data::executor) ts_registries: HashMap<
(TenantId, String),
crate::engine::timeseries::partition_registry::PartitionRegistry,
>,
/// Continuous aggregate manager for this core. Fires on memtable flush.
pub(in crate::data::executor) continuous_agg_mgr:
crate::engine::timeseries::continuous_agg::ContinuousAggregateManager,
/// Checkpoint coordinator: incremental dirty page flushing across engines.
/// Replaces timer-based checkpoint with I/O-budget-aware progressive flush.
pub(in crate::data::executor) checkpoint_coordinator:
crate::storage::checkpoint::CheckpointCoordinator,
/// L1 segment compaction config for the storage layer.
pub(in crate::data::executor) segment_compaction_config:
crate::storage::compaction::CompactionConfig,
/// Per-collection document index configurations.
/// Maps (TenantId, collection) → CollectionConfig.
/// Populated via RegisterDocumentCollection plans.
pub(in crate::data::executor) doc_configs:
HashMap<(TenantId, String), crate::engine::document::store::CollectionConfig>,
/// Per-collection last chain hash for HASH_CHAIN collections.
/// Maps (TenantId, collection) → last SHA-256 hash.
pub(in crate::data::executor) chain_hashes: HashMap<(TenantId, String), String>,
/// Query execution tuning parameters (sort run size, stream chunk size, etc.).
/// Set at core spawn time from config; never changed at runtime.
pub(in crate::data::executor) query_tuning: nodedb_types::config::tuning::QueryTuning,
/// Graph engine tuning parameters (max_visited, max_depth, LCC thresholds).
/// Set at core spawn time from config; never changed at runtime.
pub(in crate::data::executor) graph_tuning: nodedb_types::config::tuning::GraphTuning,
/// Per-core KV engine: hash tables + expiry wheel. `!Send`.
pub(in crate::data::executor) kv_engine: crate::engine::kv::KvEngine,
/// Per-core ND-array engine. Owns one LSM store per registered
/// array (`open_array`). The Control Plane allocates WAL LSNs and
/// the engine just stamps the supplied LSN into the memtable —
/// see `ArrayEngine::{put_cells, delete_cells, flush}`.
pub(in crate::data::executor) array_engine: ArrayEngine,
/// Shared array catalog handle — the Control Plane's registered
/// array metadata. The Data Plane consults this (read-only) when
/// resolving array names to `ArrayId` + schema digests during
/// dispatch.
pub(in crate::data::executor) array_catalog: ArrayCatalogHandle,
/// Per-core io_uring reader for batched columnar segment reads.
/// Initialized lazily; `None` if io_uring is not available.
pub(in crate::data::executor) uring_reader: Option<crate::data::io::uring_reader::UringReader>,
/// Encryption key for at-rest encryption of vector checkpoints.
///
/// When `Some`, `checkpoint_vector_indexes` writes encrypted checkpoint
/// files and `load_vector_checkpoints` refuses to load plaintext ones.
/// Sourced from the same WAL key used by `nodedb-wal` and snapshot writers.
pub(in crate::data::executor) vector_checkpoint_kek:
Option<nodedb_wal::crypto::WalEncryptionKey>,
/// Encryption key for at-rest encryption of spatial (R-tree and geohash) checkpoints.
///
/// When `Some`, `checkpoint_spatial_indexes` writes encrypted checkpoint files
/// and `load_spatial_checkpoints` refuses to load plaintext ones.
pub(in crate::data::executor) spatial_checkpoint_kek:
Option<nodedb_wal::crypto::WalEncryptionKey>,
/// Encryption key for at-rest encryption of columnar segments.
///
/// When `Some`, columnar segment flushes wrap the segment bytes in an
/// AES-256-GCM SEGC envelope and the reader refuses to load plaintext
/// segments.
pub(in crate::data::executor) columnar_segment_kek:
Option<nodedb_wal::crypto::WalEncryptionKey>,
/// Encryption key for at-rest encryption of array (NDAS) segments.
///
/// When `Some`, array segment flushes wrap the segment bytes in an
/// AES-256-GCM SEGA envelope and the segment handle refuses to load
/// plaintext segments.
pub(in crate::data::executor) array_segment_kek: Option<nodedb_wal::crypto::WalEncryptionKey>,
/// Memory governor for per-engine budget enforcement.
pub(in crate::data::executor) governor: Option<Arc<nodedb_mem::MemoryGovernor>>,
/// Shared per-database maintenance CPU budget tracker.
///
/// Used by all maintenance sites (`run_compaction` and friends) to gate
/// per-database background work against the quota's `maintenance_cpu_pct`.
/// Set by `set_maintenance_budget` after core spawn.
pub(in crate::data::executor) maintenance_budget:
Option<Arc<crate::control::maintenance::MaintenanceBudgetTracker>>,
/// Mapping from `TenantId` to the `DatabaseId` that owns the tenant.
///
/// Populated lazily as requests arrive (sourced from `task.request.database_id`).
/// Used by the maintenance budget tracker to look up per-database caps when
/// iterating collections keyed by `TenantId`.
pub(in crate::data::executor) tenant_database_map: HashMap<TenantId, DatabaseId>,
/// Current SPSC drain batch size, adjusted by memory pressure.
///
/// Normal: 64. Critical: halved (floor 1). Emergency: 0 (new reads
/// suspended until pressure clears). Restored with hysteresis after
/// `PRESSURE_NORMAL_HYSTERESIS` consecutive Normal/Warning iterations.
pub(crate) spsc_read_depth: usize,
/// When `true` the core loop does not drain new SPSC requests.
/// Set on Emergency pressure; cleared when pressure drops to Critical
/// or below (then normal hysteresis restores `spsc_read_depth`).
pub(crate) pressure_suspend_reads: bool,
/// Consecutive ticks at Normal/Warning pressure since last Critical/Emergency
/// transition. Used for hysteresis before restoring `spsc_read_depth`.
pub(crate) pressure_normal_ticks: u32,
/// Per-collection jemalloc arena registry.
///
/// Shared with the Control Plane for stats queries. Vector-primary
/// collections request a dedicated arena via `get_or_create`; other
/// collections use the per-core arena from `nodedb_mem::arena`.
/// `None` until wired by the server bootstrap or test harness.
pub(in crate::data::executor) collection_arena_registry:
Option<std::sync::Arc<nodedb_mem::CollectionArenaRegistry>>,
/// Shared system metrics — Arc is safe for `!Send` since all fields are atomic.
pub(in crate::data::executor) metrics: Option<Arc<crate::control::metrics::SystemMetrics>>,
/// Event bus producer: emits WriteEvents to the Event Plane.
/// One per core, `!Send` once pinned. `None` if Event Plane is disabled.
pub(in crate::data::executor) event_producer: Option<crate::event::bus::EventProducer>,
/// Monotonic sequence counter for events emitted by this core.
/// Incremented on every successful event emission.
pub(in crate::data::executor) event_sequence: u64,
/// Shared collection-scoped scan-quiesce registry.
///
/// When set, every scan handler on this core calls
/// `quiesce.try_start_scan(tenant, collection)` at entry and holds
/// the resulting `ScanGuard` across the row stream. A concurrent
/// `PurgeCollection` post-apply flow calls `begin_drain` +
/// `wait_until_drained` on the same registry, so the unlink pass
/// only runs once every in-flight scan has released.
///
/// `None` in test / no-cluster bringup paths: callers then skip
/// the gate and scan unconditionally (matching pre-quiesce
/// behavior). In the server bootstrap path `main.rs` wires the
/// shared registry via `set_quiesce` after `SharedState::open`.
pub(in crate::data::executor) quiesce:
Option<std::sync::Arc<crate::bridge::quiesce::CollectionQuiesce>>,
/// Encryption key for at-rest encryption of timeseries columnar segment files
/// (`.col`, `.sym`, `schema.json`, `sparse_index.bin`, `partition.meta`).
///
/// When `Some`, `flush_ts_collection` writes SEGT-encrypted files; readers
/// refuse to load plaintext segment files.
pub(in crate::data::executor) ts_segment_kek: Option<nodedb_wal::crypto::WalEncryptionKey>,
/// Shared quarantine registry for corrupt segments.
///
/// `Arc` is `Send + Sync` so it is safe to hold on a `!Send` core.
/// `None` until wired by the server bootstrap via `set_quarantine_registry`.
pub(in crate::data::executor) quarantine_registry:
Option<std::sync::Arc<crate::storage::quarantine::QuarantineRegistry>>,
/// In-flight concurrent index rebuilds, polled each tick.
///
/// Each entry is a `(collection_key, receiver)` pair. The receiver
/// yields a `RebuildResult` once the background OS thread finishes
/// the shadow build. Only one rebuild per collection may be in
/// progress at a time; `execute_rebuild_index` returns
/// `ErrorCode::Conflict` when a second is attempted.
pub(in crate::data::executor) pending_reindex:
Vec<crate::data::executor::handlers::control::reindex::PendingReindex>,
/// Ambient deterministic timestamp for the current Calvin epoch.
///
/// Set to `Some(ms)` by `execute_calvin_execute_static` and
/// `execute_calvin_execute_active` before dispatching the inner
/// transaction batch, then reset to `None` immediately after.
/// Engine handlers that need "current time" (bitemporal sys_from, KV TTL
/// expire_at, timeseries system_ms) call
/// `self.epoch_system_ms.unwrap_or_else(<wall_clock_read>)` so that
/// single-shard (non-Calvin) paths continue working without change.
///
/// Safety: this is safe because `CoreLoop` is `!Send` and single-threaded
/// per core. Sub-plans inside `execute_transaction_batch` do not recurse
/// back into a Calvin execute variant, so the reset after the batch is not
/// premature.
pub(in crate::data::executor) epoch_system_ms: Option<i64>,
}
impl CoreLoop {
pub fn core_id(&self) -> usize {
self.core_id
}
pub fn pending_count(&self) -> usize {
self.task_queue.len()
}
pub fn advance_watermark(&mut self, lsn: Lsn) {
self.watermark = lsn;
}
}