nodedb 0.2.0

Local-first, real-time, edge-to-cloud hybrid database for multi-modal workloads
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
// SPDX-License-Identifier: BUSL-1.1

//! [`ArrayFanout`] — fan out an applied array op to all matching subscribers.
//!
//! Called by the post-apply observer hook (see `apply.rs`) after an
//! `ArrayOp` has been durably committed to the Data Plane and op-log.
//!
//! # Flow
//!
//! 1. Extract the op's coord as `Vec<u64>` for range matching.
//! 2. Query `ShapeRegistry::evaluate_array_mutation` to find matching
//!    `(session_id, shape_id)` pairs.
//! 3. For each matching session:
//!   - Look up the subscriber cursor.
//!   - Check `cursor::should_send` (skips already-delivered ops).
//!   - Check `snapshot_trigger::check_and_trigger` (pivots to catch-up if
//!     cursor is below the GC boundary).
//!   - Encode the op as `ArrayDeltaMsg` and enqueue to the session's
//!     delivery channel.
//!   - Advance the cursor via `cursor::mark_sent`.
//!
//! # Thread safety
//!
//! `ArrayFanout` is `Send + Sync` and is designed to be wrapped in an
//! `Arc` and shared between the inbound session task and the post-apply
//! observer.

use std::collections::HashMap;
use std::sync::{Arc, RwLock};

use nodedb_array::sync::hlc::Hlc;
use nodedb_array::sync::op::ArrayOp;
use tracing::debug;

use crate::control::server::sync::shape::registry::ShapeRegistry;

use super::cursor;
use super::delivery::ArrayDeliveryRegistry;
use super::merge::MergerRegistry;
use super::snapshot_trigger;
use super::subscriber_state::SubscriberMap;

/// Observer trait: called after each op is durably applied.
///
/// Injected into `OriginApplyEngine` at construction. `ArrayFanout`
/// implements this, but tests can inject a mock.
pub trait ArrayApplyObserver: Send + Sync {
    fn on_op_applied(&self, op: &ArrayOp);
}

/// Fan-out coordinator for applied array ops.
pub struct ArrayFanout {
    /// Shape registry: maps (tenant, array, coord) → matched sessions.
    shapes: Arc<ShapeRegistry>,
    /// Per-session outbound frame channels.
    delivery: Arc<ArrayDeliveryRegistry>,
    /// Per-subscriber HLC cursors.
    cursors: Arc<SubscriberMap>,
    /// Per-array GC boundary HLC. Updated by the GC task; read here to
    /// decide when to trigger catch-up for lagging subscribers.
    snapshot_hlcs: Arc<RwLock<HashMap<String, Hlc>>>,
    /// Cross-shard merger registry for HLC-ordered multi-shard delivery.
    ///
    /// When a subscriber's `coord_range` spans multiple vShards, each shard
    /// independently calls `on_op_applied`. The merger buffers ops from all
    /// shards and drains them in HLC order before forwarding to the session's
    /// delivery channel.
    mergers: Arc<MergerRegistry>,
    /// vShard ID of the shard emitting this fanout instance's ops.
    ///
    /// Passed to the merger so it can track per-shard watermarks.
    shard_id: u16,
    /// Tenant scope for this fanout instance.
    ///
    /// Carries the same limitation as `OriginArrayInbound`: the tenant is
    /// resolved post-auth but the fanout is constructed pre-auth (to share
    /// the snapshot-assembly buffer lifetime). Until Phase I wires per-tenant
    /// fanout instances, `tenant_id = 0` means only tenant-0 array shape
    /// subscriptions receive fan-out. Single-tenant deployments are unaffected.
    tenant_id: u64,
}

impl ArrayFanout {
    /// Construct from shared components.
    pub fn new(
        shapes: Arc<ShapeRegistry>,
        delivery: Arc<ArrayDeliveryRegistry>,
        cursors: Arc<SubscriberMap>,
        snapshot_hlcs: Arc<RwLock<HashMap<String, Hlc>>>,
        mergers: Arc<MergerRegistry>,
        shard_id: u16,
        tenant_id: u64,
    ) -> Self {
        Self {
            shapes,
            delivery,
            cursors,
            snapshot_hlcs,
            mergers,
            shard_id,
            tenant_id,
        }
    }

    /// Remove a session's cursors, delivery channel, and merger buffers.
    ///
    /// Called from the listener on disconnect.
    pub fn remove_session(&self, session_id: &str) {
        self.cursors.remove_session(session_id);
        self.delivery.unregister(session_id);
        self.mergers.remove_session(session_id);
    }

    /// Fan out a single applied op to all subscribed sessions.
    fn fan_out_op(&self, op: &ArrayOp) {
        let coord_u64 = coord_to_u64(&op.coord);
        let matches =
            self.shapes
                .evaluate_array_mutation(self.tenant_id, &op.header.array, &coord_u64);

        if matches.is_empty() {
            return;
        }

        for (session_id, _shape_id) in matches {
            // Encoding is deferred to the merger, which encodes once per
            // delivery call rather than once per fan-out recipient.
            self.deliver_to_session(&session_id, op, op.header.hlc, &[]);
        }
    }

    /// Deliver one op to a single session via the multi-shard merger.
    ///
    /// The merger buffers ops from all vShards and drains them in HLC order,
    /// ensuring subscribers see a consistent stream regardless of which shard
    /// the op originated from.
    fn deliver_to_session(&self, session_id: &str, op: &ArrayOp, op_hlc: Hlc, _op_payload: &[u8]) {
        let cursor = match self.cursors.get(session_id, &op.header.array) {
            Some(c) => c,
            None => {
                // Session has not registered for this array yet — skip.
                return;
            }
        };

        // Check if this op has already been delivered.
        if !cursor::should_send(op_hlc, cursor.last_pushed_hlc) {
            debug!(
                session = %session_id,
                array = %op.header.array,
                op_hlc = ?op_hlc,
                "array_fanout: op already delivered, skipping"
            );
            return;
        }

        // Check if the subscriber cursor has fallen behind the GC boundary.
        let snapshot_hlc = self
            .snapshot_hlcs
            .read()
            .ok()
            .and_then(|m| m.get(&op.header.array).copied())
            .unwrap_or(Hlc::ZERO);

        if snapshot_trigger::check_and_trigger(
            session_id,
            &op.header.array,
            cursor.last_pushed_hlc,
            snapshot_hlc,
            &self.delivery,
        ) {
            // Subscriber needs catch-up — do not send op stream frames.
            return;
        }

        // Route through the multi-shard merger for HLC-ordered delivery.
        let merger = self.mergers.get_or_create(session_id, &op.header.array);
        merger.push_op(self.shard_id, op.clone(), &self.delivery);

        // Advance the cursor so subsequent ops from any shard know the
        // current frontier.
        cursor::mark_sent(&self.cursors, session_id, &op.header.array, op_hlc);
    }
}

impl ArrayApplyObserver for ArrayFanout {
    fn on_op_applied(&self, op: &ArrayOp) {
        self.fan_out_op(op);
    }
}

/// Extract coordinate components as `u64` for range-matching against
/// `ArrayCoordRange`.
///
/// `CoordValue::Int64` and `CoordValue::TimestampMs` are cast to `u64`
/// via bitwise reinterpretation (same bit pattern), which is correct for
/// non-negative index spaces. Negative values and `Float64`/`String`
/// coordinates are coerced to `u64::MAX` so they sort at the top of any
/// range and are delivered to unbounded subscriptions.
fn coord_to_u64(coord: &[nodedb_array::types::coord::value::CoordValue]) -> Vec<u64> {
    use nodedb_array::types::coord::value::CoordValue;
    coord
        .iter()
        .map(|c| match c {
            CoordValue::Int64(v) | CoordValue::TimestampMs(v) => *v as u64,
            CoordValue::Float64(v) => v.to_bits(),
            CoordValue::String(_) => u64::MAX,
        })
        .collect()
}

#[cfg(test)]
mod tests {
    use super::*;
    use nodedb_array::sync::op::{ArrayOpHeader, ArrayOpKind};
    use nodedb_array::sync::replica_id::ReplicaId;
    use nodedb_array::types::coord::value::CoordValue;
    use nodedb_types::sync::shape::{ArrayCoordRange, ShapeDefinition, ShapeType};
    use std::sync::Arc;

    use crate::control::server::sync::shape::registry::ShapeRegistry;

    fn replica() -> ReplicaId {
        ReplicaId::new(1)
    }

    fn hlc(ms: u64) -> Hlc {
        Hlc::new(ms, 0, replica()).unwrap()
    }

    fn make_op(array: &str, ms: u64) -> ArrayOp {
        ArrayOp {
            header: ArrayOpHeader {
                array: array.into(),
                hlc: hlc(ms),
                schema_hlc: hlc(1),
                valid_from_ms: 0,
                valid_until_ms: -1,
                system_from_ms: ms as i64,
            },
            kind: ArrayOpKind::Put,
            coord: vec![CoordValue::Int64(ms as i64)],
            attrs: None,
        }
    }

    fn make_fanout() -> (
        ArrayFanout,
        Arc<ShapeRegistry>,
        Arc<ArrayDeliveryRegistry>,
        Arc<SubscriberMap>,
    ) {
        use super::super::merge::MergerRegistry;
        use super::super::subscriber_state::SubscriberStore;
        let shapes = Arc::new(ShapeRegistry::new());
        let delivery = Arc::new(ArrayDeliveryRegistry::new());
        let store = SubscriberStore::in_memory().unwrap();
        let cursors = Arc::new(SubscriberMap::new(store));
        let snapshot_hlcs = Arc::new(std::sync::RwLock::new(std::collections::HashMap::new()));
        let mergers = Arc::new(MergerRegistry::new());
        let fanout = ArrayFanout::new(
            Arc::clone(&shapes),
            Arc::clone(&delivery),
            Arc::clone(&cursors),
            snapshot_hlcs,
            mergers,
            0,
            1,
        );
        (fanout, shapes, delivery, cursors)
    }

    #[tokio::test]
    async fn op_delivered_to_matching_subscriber() {
        let (fanout, shapes, delivery, cursors) = make_fanout();

        // Register subscriber.
        cursors.register("s1", "prices", None);
        let mut rx = delivery.register("s1".into());

        // Register shape subscription.
        shapes.subscribe(
            "s1",
            1,
            ShapeDefinition {
                shape_id: "sh1".into(),
                tenant_id: 1,
                shape_type: ShapeType::Array {
                    array_name: "prices".into(),
                    coord_range: None,
                },
                description: "all prices".into(),
                field_filter: vec![],
            },
        );

        let op = make_op("prices", 100);
        fanout.on_op_applied(&op);

        // Should have received one frame.
        let frame = rx.try_recv().expect("frame should be delivered");
        assert!(!frame.is_empty());
    }

    #[tokio::test]
    async fn op_not_delivered_to_wrong_array() {
        let (fanout, shapes, delivery, cursors) = make_fanout();

        cursors.register("s1", "prices", None);
        let mut rx = delivery.register("s1".into());

        shapes.subscribe(
            "s1",
            1,
            ShapeDefinition {
                shape_id: "sh1".into(),
                tenant_id: 1,
                shape_type: ShapeType::Array {
                    array_name: "other".into(),
                    coord_range: None,
                },
                description: "other".into(),
                field_filter: vec![],
            },
        );

        let op = make_op("prices", 100);
        fanout.on_op_applied(&op);

        assert!(rx.try_recv().is_err());
    }

    #[tokio::test]
    async fn op_not_delivered_when_coord_outside_range() {
        let (fanout, shapes, delivery, cursors) = make_fanout();

        cursors.register("s1", "mat", None);
        let mut rx = delivery.register("s1".into());

        shapes.subscribe(
            "s1",
            1,
            ShapeDefinition {
                shape_id: "sh1".into(),
                tenant_id: 1,
                shape_type: ShapeType::Array {
                    array_name: "mat".into(),
                    coord_range: Some(ArrayCoordRange {
                        start: vec![0],
                        end: Some(vec![9]),
                    }),
                },
                description: "narrow".into(),
                field_filter: vec![],
            },
        );

        // coord = [50], outside [0, 9]
        let op = ArrayOp {
            header: ArrayOpHeader {
                array: "mat".into(),
                hlc: hlc(200),
                schema_hlc: hlc(1),
                valid_from_ms: 0,
                valid_until_ms: -1,
                system_from_ms: 200,
            },
            kind: ArrayOpKind::Put,
            coord: vec![CoordValue::Int64(50)],
            attrs: None,
        };
        fanout.on_op_applied(&op);

        assert!(rx.try_recv().is_err());
    }

    #[tokio::test]
    async fn duplicate_op_not_redelivered() {
        let (fanout, shapes, delivery, cursors) = make_fanout();

        cursors.register("s1", "prices", None);
        let mut rx = delivery.register("s1".into());

        shapes.subscribe(
            "s1",
            1,
            ShapeDefinition {
                shape_id: "sh1".into(),
                tenant_id: 1,
                shape_type: ShapeType::Array {
                    array_name: "prices".into(),
                    coord_range: None,
                },
                description: "all".into(),
                field_filter: vec![],
            },
        );

        let op = make_op("prices", 100);
        fanout.on_op_applied(&op);
        let _first = rx.try_recv().expect("first delivery");

        // Replay same op — should not be re-delivered.
        fanout.on_op_applied(&op);
        assert!(rx.try_recv().is_err());
    }
}