nodedb 0.2.0

Local-first, real-time, edge-to-cloud hybrid database for multi-modal workloads
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
// SPDX-License-Identifier: BUSL-1.1

//! Clone CoW write-path interception for the pgwire handler.
//!
//! Hooked into `dispatch_task_loop` before the normal "dispatch_task" call.
//! For `PointUpdate` and `PointDelete` targeting a `Shadowed` or `Materializing`
//! clone, applies the copy-up / tombstone protocol so the source database is
//! never modified.
//!
//! Non-cloned collections and `Materialized` clones return `None` — zero overhead.

use std::sync::Arc;
use std::time::Duration;

use pgwire::error::{ErrorInfo, PgWireError, PgWireResult};

use nodedb_types::{CloneStatus, DatabaseId, Lsn, Surrogate, TenantId};

use crate::bridge::envelope::{Priority, Request, Response, Status};
use crate::bridge::physical_plan::{DocumentOp, KvOp, PhysicalPlan};
use crate::control::clone::copyup::{
    CopyUpParams, KvCopyUpParams, perform_clone_copyup, perform_kv_clone_copyup,
};
use crate::control::clone::tombstone::{
    KvTombstoneParams, TombstoneParams, perform_clone_tombstone, perform_kv_clone_tombstone,
};
use crate::control::planner::physical::PhysicalTask;
use crate::control::state::SharedState;
use crate::types::{ReadConsistency, RequestId, TraceId, VShardId};

use super::super::core::NodeDbPgHandler;

/// Outcome of write-path clone interception.
pub(super) enum CloneWriteOutcome {
    /// No interception needed — caller must dispatch normally.
    Passthrough,
    /// The write was fully handled by the clone path. Caller uses this response.
    Handled(Response),
}

impl NodeDbPgHandler {
    /// Intercept a single write task for a cloned collection.
    ///
    /// Must be called for every `PointUpdate` and `PointDelete` task before
    /// normal dispatch. Returns `Passthrough` when the collection is not a
    /// Shadowed/Materializing clone (zero overhead for non-clone paths).
    pub(super) async fn maybe_intercept_clone_write(
        &self,
        task: &PhysicalTask,
        tenant_id: TenantId,
    ) -> PgWireResult<CloneWriteOutcome> {
        match &task.plan {
            PhysicalPlan::Document(DocumentOp::PointUpdate { .. })
            | PhysicalPlan::Document(DocumentOp::PointDelete { .. }) => {
                self.intercept_doc_clone_write(task, tenant_id).await
            }
            PhysicalPlan::Kv(KvOp::FieldSet { .. }) | PhysicalPlan::Kv(KvOp::Delete { .. }) => {
                self.intercept_kv_clone_write(task, tenant_id).await
            }
            _ => Ok(CloneWriteOutcome::Passthrough),
        }
    }

    /// Handle Document CoW write interception (PointUpdate / PointDelete).
    async fn intercept_doc_clone_write(
        &self,
        task: &PhysicalTask,
        tenant_id: TenantId,
    ) -> PgWireResult<CloneWriteOutcome> {
        let (collection_qualified, document_id, surrogate, is_delete) = match &task.plan {
            PhysicalPlan::Document(DocumentOp::PointUpdate {
                collection,
                document_id,
                surrogate,
                ..
            }) => (collection.as_str(), document_id.as_str(), *surrogate, false),
            PhysicalPlan::Document(DocumentOp::PointDelete {
                collection,
                document_id,
                surrogate,
                ..
            }) => (collection.as_str(), document_id.as_str(), *surrogate, true),
            _ => return Ok(CloneWriteOutcome::Passthrough),
        };

        let catalog_arc = self.state.credentials.catalog();
        let Some(catalog) = catalog_arc.as_ref() else {
            return Ok(CloneWriteOutcome::Passthrough);
        };

        let db_id = task.database_id;
        let coll_name = strip_db_prefix(db_id, collection_qualified);

        let desc = catalog
            .get_collection(db_id, tenant_id.as_u64(), coll_name)
            .map_err(|e| write_err(&format!("clone write: get_collection: {e}")))?;
        let Some(desc) = desc else {
            return Ok(CloneWriteOutcome::Passthrough);
        };

        let Some(ref origin) = desc.cloned_from else {
            return Ok(CloneWriteOutcome::Passthrough);
        };
        match desc.clone_status {
            CloneStatus::Materialized => return Ok(CloneWriteOutcome::Passthrough),
            CloneStatus::Shadowed | CloneStatus::Materializing { .. } => {}
        }

        let row_in_target = probe_row_in_target(
            &self.state,
            tenant_id,
            db_id,
            collection_qualified,
            document_id,
            surrogate,
        )
        .await
        .map_err(|e| write_err(&format!("clone write probe: {e}")))?;

        if row_in_target {
            return Ok(CloneWriteOutcome::Passthrough);
        }

        if is_delete {
            perform_clone_tombstone(TombstoneParams {
                state: &self.state,
                target_db_id: db_id,
                target_collection: coll_name,
                source_surrogate: surrogate,
            })
            .map_err(|e| write_err(&format!("clone tombstone: {e}")))?;

            let synthetic_resp = synthetic_ok_response(self.next_request_id(), Lsn::new(0));
            return Ok(CloneWriteOutcome::Handled(synthetic_resp));
        }

        let source_db_id = origin.source_database;
        let source_coll = origin.source_collection.as_str();
        let source_coll_qualified =
            crate::control::planner::sql_plan_convert::convert::db_qualified(
                source_db_id,
                source_coll,
            );

        let source_row_bytes = fetch_source_row(
            &self.state,
            tenant_id,
            source_db_id,
            &source_coll_qualified,
            document_id,
            surrogate,
        )
        .await
        .map_err(|e| write_err(&format!("clone write fetch source: {e}")))?;

        let Some(source_row_bytes) = source_row_bytes else {
            return Ok(CloneWriteOutcome::Passthrough);
        };

        perform_clone_copyup(CopyUpParams {
            state: &Arc::clone(&self.state),
            tenant_id,
            target_db_id: db_id,
            target_collection: coll_name,
            origin,
            source_surrogate: surrogate,
            source_doc_id: document_id.to_string(),
            source_row_bytes,
        })
        .await
        .map_err(|e| write_err(&format!("clone copyup: {e}")))?;

        Ok(CloneWriteOutcome::Passthrough)
    }

    /// Handle KV CoW write interception (FieldSet / Delete).
    async fn intercept_kv_clone_write(
        &self,
        task: &PhysicalTask,
        tenant_id: TenantId,
    ) -> PgWireResult<CloneWriteOutcome> {
        let (collection_qualified, kv_key, is_delete) = match &task.plan {
            PhysicalPlan::Kv(KvOp::FieldSet {
                collection, key, ..
            }) => (collection.as_str(), key.clone(), false),
            PhysicalPlan::Kv(KvOp::Delete { collection, keys }) => {
                // Delete may have multiple keys; handle each. We serialize here
                // (one tombstone per key) and return Handled with synthetic OK.
                let collection_qualified = collection.as_str();
                let db_id = task.database_id;
                let coll_name = strip_db_prefix(db_id, collection_qualified);

                let catalog_arc = self.state.credentials.catalog();
                let Some(catalog) = catalog_arc.as_ref() else {
                    return Ok(CloneWriteOutcome::Passthrough);
                };

                let desc = catalog
                    .get_collection(db_id, tenant_id.as_u64(), coll_name)
                    .map_err(|e| write_err(&format!("clone kv delete: get_collection: {e}")))?;
                let Some(desc) = desc else {
                    return Ok(CloneWriteOutcome::Passthrough);
                };
                if desc.cloned_from.is_none() {
                    return Ok(CloneWriteOutcome::Passthrough);
                }
                match desc.clone_status {
                    CloneStatus::Materialized => return Ok(CloneWriteOutcome::Passthrough),
                    CloneStatus::Shadowed | CloneStatus::Materializing { .. } => {}
                }

                // Split each key into one of two paths:
                //   • key absent in target (source-only) → record a tombstone
                //     so future scans hide the source row.
                //   • key present in target (already copied up or written
                //     in this clone) → dispatch a real KV Delete to remove
                //     the target row, then ALSO record a tombstone so any
                //     surviving source row remains hidden after deletion.
                //
                // Tombstoning unconditionally for target-resident keys is
                // safe: the source row (if any) must always be hidden in
                // this clone after the user has issued a DELETE.
                let mut keys_to_dispatch: Vec<Vec<u8>> = Vec::new();
                for key in keys {
                    let key_str = String::from_utf8_lossy(key).into_owned();
                    let key_in_target = probe_kv_key_in_target(
                        &self.state,
                        tenant_id,
                        db_id,
                        collection_qualified,
                        key,
                    )
                    .await
                    .map_err(|e| write_err(&format!("clone kv delete probe: {e}")))?;

                    perform_kv_clone_tombstone(KvTombstoneParams {
                        state: &self.state,
                        target_db_id: db_id,
                        target_collection: coll_name,
                        kv_key: key_str,
                    })
                    .map_err(|e| write_err(&format!("clone kv tombstone: {e}")))?;

                    if key_in_target {
                        keys_to_dispatch.push(key.clone());
                    }
                }

                if !keys_to_dispatch.is_empty() {
                    // Dispatch a real Delete for keys that exist in target.
                    let delete_plan = PhysicalPlan::Kv(KvOp::Delete {
                        collection: collection_qualified.to_string(),
                        keys: keys_to_dispatch,
                    });
                    let vshard_id =
                        VShardId::from_collection_in_database(db_id, collection_qualified);
                    let resp = dispatch_data_plane_raw(
                        &self.state,
                        tenant_id,
                        vshard_id,
                        db_id,
                        delete_plan,
                    )
                    .await
                    .map_err(|e| write_err(&format!("clone kv delete dispatch: {e}")))?;
                    return Ok(CloneWriteOutcome::Handled(resp));
                }

                let synthetic_resp = synthetic_ok_response(self.next_request_id(), Lsn::new(0));
                return Ok(CloneWriteOutcome::Handled(synthetic_resp));
            }
            _ => return Ok(CloneWriteOutcome::Passthrough),
        };

        // FieldSet path: check clone status, copy-up if needed.
        let db_id = task.database_id;
        let coll_name = strip_db_prefix(db_id, collection_qualified);

        let catalog_arc = self.state.credentials.catalog();
        let Some(catalog) = catalog_arc.as_ref() else {
            return Ok(CloneWriteOutcome::Passthrough);
        };

        let desc = catalog
            .get_collection(db_id, tenant_id.as_u64(), coll_name)
            .map_err(|e| write_err(&format!("clone kv write: get_collection: {e}")))?;
        let Some(desc) = desc else {
            return Ok(CloneWriteOutcome::Passthrough);
        };

        let Some(ref origin) = desc.cloned_from else {
            return Ok(CloneWriteOutcome::Passthrough);
        };
        match desc.clone_status {
            CloneStatus::Materialized => return Ok(CloneWriteOutcome::Passthrough),
            CloneStatus::Shadowed | CloneStatus::Materializing { .. } => {}
        }

        // FieldSet is not a delete.
        let _ = is_delete;

        let key_in_target =
            probe_kv_key_in_target(&self.state, tenant_id, db_id, collection_qualified, &kv_key)
                .await
                .map_err(|e| write_err(&format!("clone kv write probe: {e}")))?;

        if key_in_target {
            // Row exists in target — let the normal FieldSet proceed.
            return Ok(CloneWriteOutcome::Passthrough);
        }

        // Fetch source KV row and copy it up to target.
        let source_db_id = origin.source_database;
        let source_coll = origin.source_collection.as_str();
        let source_coll_qualified =
            crate::control::planner::sql_plan_convert::convert::db_qualified(
                source_db_id,
                source_coll,
            );

        let source_value = fetch_kv_source_value(
            &self.state,
            tenant_id,
            source_db_id,
            &source_coll_qualified,
            &kv_key,
        )
        .await
        .map_err(|e| write_err(&format!("clone kv copyup fetch: {e}")))?;

        let Some(source_value) = source_value else {
            // Row absent in source — let normal FieldSet run (no-op or error from DP).
            return Ok(CloneWriteOutcome::Passthrough);
        };

        let kv_key_str = String::from_utf8_lossy(&kv_key).into_owned();

        perform_kv_clone_copyup(KvCopyUpParams {
            state: &Arc::clone(&self.state),
            tenant_id,
            target_db_id: db_id,
            target_collection: coll_name,
            kv_key,
            source_value_bytes: source_value,
        })
        .await
        .map_err(|e| write_err(&format!("clone kv copyup: {e}")))?;

        // Tombstone the source key so future clone reads do not merge in the
        // now-superseded source row.  The copy-up wrote the row to the target
        // and the FieldSet will overwrite it; the source copy must be hidden.
        perform_kv_clone_tombstone(KvTombstoneParams {
            state: &self.state,
            target_db_id: db_id,
            target_collection: coll_name,
            kv_key: kv_key_str,
        })
        .map_err(|e| write_err(&format!("clone kv tombstone after copyup: {e}")))?;

        // Fall through: let the original FieldSet dispatch to the target.
        Ok(CloneWriteOutcome::Passthrough)
    }
}

// ── helpers ──────────────────────────────────────────────────────────────────

/// Probe whether `document_id` exists in target storage.
///
/// Issues a synchronous PointGet to the local Data Plane and returns `true`
/// if the row is present.  Uses `Surrogate::ZERO` when the catalog has no
/// registered surrogate for the PK — the handler will return "not found".
async fn probe_row_in_target(
    state: &SharedState,
    tenant_id: TenantId,
    db_id: DatabaseId,
    collection_qualified: &str,
    document_id: &str,
    surrogate: Surrogate,
) -> crate::Result<bool> {
    let plan = PhysicalPlan::Document(DocumentOp::PointGet {
        collection: collection_qualified.to_string(),
        document_id: document_id.to_string(),
        surrogate,
        pk_bytes: document_id.as_bytes().to_vec(),
        rls_filters: Vec::new(),
        system_as_of_ms: None,
        valid_at_ms: None,
    });
    let vshard_id = VShardId::from_collection_in_database(db_id, collection_qualified);
    let resp = dispatch_data_plane_raw(state, tenant_id, vshard_id, db_id, plan).await?;
    Ok(!resp.payload.is_empty() && resp.status == Status::Ok)
}

/// Fetch the raw msgpack bytes for a row from the source collection.
///
/// Returns `None` when the row is absent in source (PointGet returned empty).
async fn fetch_source_row(
    state: &SharedState,
    tenant_id: TenantId,
    source_db_id: DatabaseId,
    source_coll_qualified: &str,
    document_id: &str,
    surrogate: Surrogate,
) -> crate::Result<Option<Vec<u8>>> {
    let plan = PhysicalPlan::Document(DocumentOp::PointGet {
        collection: source_coll_qualified.to_string(),
        document_id: document_id.to_string(),
        surrogate,
        pk_bytes: document_id.as_bytes().to_vec(),
        rls_filters: Vec::new(),
        system_as_of_ms: None,
        valid_at_ms: None,
    });
    let vshard_id = VShardId::from_collection_in_database(source_db_id, source_coll_qualified);
    let resp = dispatch_data_plane_raw(state, tenant_id, vshard_id, source_db_id, plan).await?;
    if resp.payload.is_empty() || resp.status != Status::Ok {
        return Ok(None);
    }
    Ok(Some(resp.payload.as_ref().to_vec()))
}

/// Probe whether `kv_key` exists in target KV storage.
///
/// Issues a KvOp::Get to the local Data Plane and returns `true` if the key
/// is present.
async fn probe_kv_key_in_target(
    state: &SharedState,
    tenant_id: TenantId,
    db_id: DatabaseId,
    collection_qualified: &str,
    kv_key: &[u8],
) -> crate::Result<bool> {
    let plan = PhysicalPlan::Kv(KvOp::Get {
        collection: collection_qualified.to_string(),
        key: kv_key.to_vec(),
        rls_filters: Vec::new(),
        // Internal probe of the clone's own target collection — never
        // delegated to source, so no isolation ceiling applies.
        surrogate_ceiling: None,
    });
    let vshard_id = VShardId::from_collection_in_database(db_id, collection_qualified);
    let resp = dispatch_data_plane_raw(state, tenant_id, vshard_id, db_id, plan).await?;
    Ok(!resp.payload.is_empty() && resp.status == Status::Ok)
}

/// Fetch the raw value bytes for a KV row from the source collection.
///
/// Returns `None` when the key is absent in source (KvOp::Get returned empty).
async fn fetch_kv_source_value(
    state: &SharedState,
    tenant_id: TenantId,
    source_db_id: DatabaseId,
    source_coll_qualified: &str,
    kv_key: &[u8],
) -> crate::Result<Option<Vec<u8>>> {
    let plan = PhysicalPlan::Kv(KvOp::Get {
        collection: source_coll_qualified.to_string(),
        key: kv_key.to_vec(),
        rls_filters: Vec::new(),
        // Copy-up reads must see every binding in the source — the
        // post-copy target write reflects the latest source state, and
        // a missed source row would silently drop data on the clone.
        surrogate_ceiling: None,
    });
    let vshard_id = VShardId::from_collection_in_database(source_db_id, source_coll_qualified);
    let resp = dispatch_data_plane_raw(state, tenant_id, vshard_id, source_db_id, plan).await?;
    if resp.payload.is_empty() || resp.status != Status::Ok {
        return Ok(None);
    }
    Ok(Some(resp.payload.as_ref().to_vec()))
}

/// Dispatch a plan directly to the local Data Plane, bypassing WAL and Raft.
/// Used only for read probes inside the clone write helper.
async fn dispatch_data_plane_raw(
    state: &SharedState,
    tenant_id: TenantId,
    vshard_id: VShardId,
    database_id: DatabaseId,
    plan: PhysicalPlan,
) -> crate::Result<Response> {
    let req_id = RequestId::new(
        state
            .request_id_counter
            .fetch_add(1, std::sync::atomic::Ordering::Relaxed),
    );
    let deadline_secs = state.tuning.network.default_deadline_secs;
    let deadline_dur = Duration::from_secs(deadline_secs);
    let req = Request {
        request_id: req_id,
        tenant_id,
        vshard_id,
        database_id,
        plan,
        deadline: std::time::Instant::now() + deadline_dur,
        priority: Priority::Normal,
        trace_id: TraceId::ZERO,
        consistency: ReadConsistency::Strong,
        idempotency_key: None,
        event_source: crate::event::EventSource::User,
        user_roles: Vec::new(),
        user_id: None,
        statement_digest: None,
    };
    let mut rx = state.tracker.register(req_id);
    match state.dispatcher.lock() {
        Ok(mut d) => d.dispatch(req)?,
        Err(p) => p.into_inner().dispatch(req)?,
    }
    tokio::time::timeout(deadline_dur, rx.recv())
        .await
        .map_err(|_| crate::Error::DeadlineExceeded { request_id: req_id })?
        .ok_or(crate::Error::Dispatch {
            detail: "clone write probe: response channel closed".into(),
        })
}

/// Build a synthetic OK response with no payload (used for tombstone success).
fn synthetic_ok_response(request_id: RequestId, watermark_lsn: Lsn) -> Response {
    Response {
        request_id,
        status: Status::Ok,
        attempt: 1,
        partial: false,
        payload: Vec::<u8>::new().into(),
        watermark_lsn,
        error_code: None,
    }
}

/// Strip the `"<db_id>/"` db-qualified prefix.
fn strip_db_prefix(db_id: DatabaseId, qualified: &str) -> &str {
    if db_id == DatabaseId::DEFAULT {
        return qualified;
    }
    let prefix = format!("{}/", db_id.as_u64());
    qualified.strip_prefix(prefix.as_str()).unwrap_or(qualified)
}

/// Convert a clone write error to a PgWireError.
fn write_err(msg: &str) -> PgWireError {
    PgWireError::UserError(Box::new(ErrorInfo::new(
        "ERROR".to_owned(),
        "XX000".to_owned(),
        msg.to_owned(),
    )))
}