nodedb 0.3.0

Local-first, real-time, edge-to-cloud hybrid database for multi-modal workloads
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
// SPDX-License-Identifier: BUSL-1.1

//! pgwire wire-level tests for per-connection tenant scoping and Trust-mode
//! identity resolution.
//!
//! Covers the class of bug where the pgwire handler's planning context is
//! built once with a fixed tenant id, so queries from tenant-scoped users
//! plan against the wrong tenant's catalog. Also covers Trust-mode accepting
//! usernames that were never created as if they were superusers.

mod common;

use common::pgwire_harness::TestServer;

/// Helper: superuser-side bootstrap — create a tenant and a tenant-scoped
/// user, plus a collection owned by that tenant. The harness's default
/// connection is Trust/superuser (tenant 1) and is used for setup.
async fn bootstrap_tenant_user(server: &TestServer, user: &str, collection: &str) {
    server
        .exec("CREATE TENANT acme ID 2")
        .await
        .expect("CREATE TENANT");
    server
        .exec(&format!(
            "CREATE USER {user} WITH PASSWORD 'x' ROLE readwrite TENANT 2"
        ))
        .await
        .expect("CREATE USER");
    // Create the collection as the tenant-scoped user so ownership lands on
    // tenant 2. This itself exercises the DDL path, which already reads
    // identity.tenant_id correctly.
    let (svc, _h) = server
        .connect_as(user, "x")
        .await
        .expect("tenant user connect");
    svc.simple_query(&format!(
        "CREATE COLLECTION {collection}  \
         (id TEXT PRIMARY KEY, content TEXT NOT NULL) WITH (engine='document_strict')"
    ))
    .await
    .expect("tenant user CREATE COLLECTION");
    drop(svc);
}

/// Run a simple query under a freshly opened tenant-user connection and
/// return either the rows (first column) or the server error message.
async fn query_as(server: &TestServer, user: &str, sql: &str) -> Result<Vec<String>, String> {
    let (client, _h) = server.connect_as(user, "x").await?;
    match client.simple_query(sql).await {
        Ok(msgs) => {
            let mut rows = Vec::new();
            for msg in msgs {
                if let tokio_postgres::SimpleQueryMessage::Row(row) = msg {
                    rows.push(row.get(0).unwrap_or("").to_string());
                }
            }
            Ok(rows)
        }
        Err(e) => Err(pg_err(&e)),
    }
}

fn pg_err(e: &tokio_postgres::Error) -> String {
    if let Some(db) = e.as_db_error() {
        format!("{}: {}", db.code().code(), db.message())
    } else {
        format!("{e:?}")
    }
}

// ── tenant-scoped planning via shared query_ctx ────────────────────────

#[tokio::test]
async fn tenant_user_can_select_own_collection() {
    let server = TestServer::start().await;
    bootstrap_tenant_user(&server, "svc_sel", "t2_sel").await;

    let (svc, _h) = server.connect_as("svc_sel", "x").await.unwrap();
    svc.simple_query("INSERT INTO t2_sel (id, content) VALUES ('a', 'alpha')")
        .await
        .expect("INSERT under tenant user");

    let rows = query_as(&server, "svc_sel", "SELECT id FROM t2_sel")
        .await
        .expect("SELECT under tenant user must not fail with 'unknown table'");
    assert_eq!(rows.len(), 1, "expected 1 row, got {rows:?}");
    assert_eq!(rows[0], "a", "row should contain id 'a': {rows:?}");
}

#[tokio::test]
async fn tenant_user_can_insert_into_own_collection() {
    let server = TestServer::start().await;
    bootstrap_tenant_user(&server, "svc_ins", "t2_ins").await;

    let (svc, _h) = server.connect_as("svc_ins", "x").await.unwrap();
    svc.simple_query("INSERT INTO t2_ins (id, content) VALUES ('k', 'v')")
        .await
        .expect("INSERT must succeed for tenant-owned collection");
}

#[tokio::test]
async fn tenant_user_can_update_own_collection() {
    let server = TestServer::start().await;
    bootstrap_tenant_user(&server, "svc_upd", "t2_upd").await;

    let (svc, _h) = server.connect_as("svc_upd", "x").await.unwrap();
    svc.simple_query("INSERT INTO t2_upd (id, content) VALUES ('a', 'old')")
        .await
        .unwrap();
    svc.simple_query("UPDATE t2_upd SET content = 'new' WHERE id = 'a'")
        .await
        .expect("UPDATE must not fail with 'unknown table'");

    let rows = query_as(
        &server,
        "svc_upd",
        "SELECT content FROM t2_upd WHERE id = 'a'",
    )
    .await
    .unwrap();
    assert_eq!(rows.len(), 1);
    assert_eq!(
        rows[0], "new",
        "row should reflect updated content: {rows:?}"
    );
}

#[tokio::test]
async fn tenant_user_can_delete_from_own_collection() {
    let server = TestServer::start().await;
    bootstrap_tenant_user(&server, "svc_del", "t2_del").await;

    let (svc, _h) = server.connect_as("svc_del", "x").await.unwrap();
    svc.simple_query("INSERT INTO t2_del (id, content) VALUES ('a', 'x')")
        .await
        .unwrap();
    svc.simple_query("DELETE FROM t2_del WHERE id = 'a'")
        .await
        .expect("DELETE must not fail with 'unknown table'");

    let rows = query_as(&server, "svc_del", "SELECT id FROM t2_del")
        .await
        .unwrap();
    assert!(rows.is_empty(), "row should be deleted, got {rows:?}");
}

#[tokio::test]
async fn tenant_user_prepared_select_resolves_own_collection() {
    // Extended protocol goes through NodeDbQueryParser::parse_sql, which
    // builds its own OriginCatalog with a hardcoded tenant id. A tenant-2
    // user must still be able to prepare and execute a statement against
    // a tenant-2 collection.
    let server = TestServer::start().await;
    bootstrap_tenant_user(&server, "svc_prep", "t2_prep").await;

    let (svc, _h) = server.connect_as("svc_prep", "x").await.unwrap();
    svc.simple_query("INSERT INTO t2_prep (id, content) VALUES ('a', 'alpha')")
        .await
        .unwrap();

    // `prepare` drives Parse + Describe through NodeDbQueryParser::parse_sql,
    // which constructs an OriginCatalog. Before the fix this catalog was
    // hardcoded to tenant 1 and could not see tenant-2 collections —
    // `prepare` would surface the server's "unknown table" error.
    svc.prepare("SELECT content FROM t2_prep WHERE id = 'a'")
        .await
        .expect("prepare must resolve tenant-owned collection via parser.rs");
}

#[tokio::test]
async fn tenant_user_cannot_see_other_tenants_collection_as_empty() {
    // Asymmetric-isolation guard: a tenant-2 user issuing SELECT against a
    // tenant-1 collection must NOT silently return an empty result set.
    // The correct behavior is the same "unknown table" a cross-tenant
    // planner produces the other direction. Silent empty is a data-shape
    // leak vector even though no rows cross.
    let server = TestServer::start().await;
    server
        .exec(
            "CREATE COLLECTION t1_only  \
             (id TEXT PRIMARY KEY, secret TEXT NOT NULL) WITH (engine='document_strict')",
        )
        .await
        .unwrap();
    server
        .exec("INSERT INTO t1_only (id, secret) VALUES ('a', 'classified')")
        .await
        .unwrap();

    server.exec("CREATE TENANT acme ID 2").await.unwrap();
    server
        .exec("CREATE USER svc_xtn WITH PASSWORD 'x' ROLE readwrite TENANT 2")
        .await
        .unwrap();

    let result = query_as(&server, "svc_xtn", "SELECT secret FROM t1_only").await;
    match result {
        Err(msg) => {
            // Accept both legacy "unknown table" and the canonical
            // "table not found" wording — semantics are identical
            // (cross-tenant isolation enforced via lookup failure).
            let lower = msg.to_lowercase();
            assert!(
                lower.contains("unknown table") || lower.contains("table not found"),
                "expected isolation error, got: {msg}"
            );
        }
        Ok(rows) => panic!(
            "tenant-2 user must not see tenant-1 collection via silent empty result; got rows={rows:?}"
        ),
    }
}

// ── Trust-mode identity resolution ─────────────────────────────────────

#[tokio::test]
async fn trust_mode_rejects_unknown_username() {
    // Under Trust mode NodeDB skips password verification, but it must
    // still resolve the connecting username against the credential store.
    // Accepting a fabricated username silently promotes arbitrary clients
    // to tenant-1 superuser (see core.rs resolve_identity fallback).
    let server = TestServer::start().await;

    let result = server
        .connect_as("nosuchuser_ever_created", "anything")
        .await;
    assert!(
        result.is_err(),
        "Trust mode must reject a username that was never CREATE USER'd; got an accepted connection"
    );
}

#[tokio::test]
async fn trust_mode_unknown_user_cannot_run_superuser_ddl() {
    // Defense-in-depth regression guard for the same root cause: even if
    // a connection is somehow permitted, an unknown username MUST NOT be
    // silently fabricated as a superuser identity capable of tenant/user
    // management DDL. Today core.rs sets `is_superuser: true` in the
    // fallback branch, so `CREATE USER` from a fabricated name succeeds.
    let server = TestServer::start().await;

    let Ok((client, _h)) = server.connect_as("ghost_admin", "anything").await else {
        // If connect_as is already rejected by the prior test's fix, that
        // is a strictly stronger guarantee — the bug is still captured.
        return;
    };

    let result = client
        .simple_query("CREATE USER mallory WITH PASSWORD 'y' ROLE readwrite TENANT 1")
        .await;
    assert!(
        result.is_err(),
        "unknown Trust-mode user must not be granted superuser privileges; CREATE USER succeeded"
    );
    if let Err(e) = result {
        let msg = pg_err(&e);
        assert!(
            msg.contains("42501") || msg.to_lowercase().contains("permission"),
            "expected a permission-denied error, got: {msg}"
        );
    }
}

// ── Runtime SET keys that touch identity / security ────────────────────────
//
// Superusers may switch session tenant context at runtime via
// `SET TENANT = '<name>' | <id> | DEFAULT` and `SET nodedb.tenant_id = <id>`.
// The override is applied at every `resolve_identity` call so every planning
// / routing path naturally honors it without per-callsite changes. The
// switch is rejected for non-superusers (42501) and inside active
// transactions (25001), and invalidates the session's plan and prepared-
// statement caches so plans against the prior tenant's catalog cannot be
// reused. `RESET TENANT` (and `SET TENANT = DEFAULT`) restores the
// identity-bound tenant.
//
// `SET ROLE` / `SET SESSION AUTHORIZATION` are not implemented (identity is
// otherwise bound at connection time) and reject with 0A000 instead of
// silently storing the value. Unknown runtime parameters reject with 42704,
// mirroring the existing SHOW asymmetry.

/// Run a single statement under a freshly opened connection and return either
/// `Ok(())` or the server's error message.
async fn exec_as(server: &TestServer, user: &str, sql: &str) -> Result<(), String> {
    let (client, _h) = server.connect_as(user, "x").await?;
    client
        .simple_query(sql)
        .await
        .map(|_| ())
        .map_err(|e| pg_err(&e))
}

fn assert_rejected_with_any(result: Result<(), String>, codes: &[&str], context: &str) {
    match result {
        Ok(()) => panic!(
            "{context}: spec requires rejection so silent identity misrouting is impossible, \
             but the server returned success"
        ),
        Err(msg) => {
            let lower = msg.to_lowercase();
            let code_hit = codes.iter().any(|c| msg.contains(c));
            let wording_hit = lower.contains("not supported")
                || lower.contains("unrecognized")
                || lower.contains("unknown")
                || lower.contains("permission")
                || lower.contains("insufficient");
            assert!(
                code_hit || wording_hit,
                "{context}: expected one of {codes:?} or an explanatory wording, got: {msg}"
            );
        }
    }
}

#[tokio::test]
async fn set_tenant_as_superuser_switches_effective_tenant() {
    // The end-to-end spec: a superuser issues SET TENANT and subsequent
    // writes land in the target tenant's catalog, not tenant 0. This is
    // the behavior the silent-no-op bug obscured.
    let server = TestServer::start().await;
    server.exec("CREATE TENANT acme ID 2").await.unwrap();

    let (svc, _h) = server.connect_as("nodedb", "nodedb").await.unwrap();
    svc.simple_query("SET TENANT = 'acme'")
        .await
        .expect("superuser SET TENANT must succeed");

    // Collection created under the switched tenant must belong to tenant 2.
    svc.simple_query(
        "CREATE COLLECTION switched_under_acme  \
         (id TEXT PRIMARY KEY, content TEXT NOT NULL) WITH (engine='document_strict')",
    )
    .await
    .expect("CREATE COLLECTION under switched tenant must succeed");
    svc.simple_query("INSERT INTO switched_under_acme (id, content) VALUES ('a', 'x')")
        .await
        .expect("INSERT under switched tenant must succeed");

    // SHOW TENANT (singular) reports the effective tenant on this connection.
    let msgs = svc.simple_query("SHOW TENANT").await.expect("SHOW TENANT");
    let row = msgs
        .iter()
        .find_map(|m| {
            if let tokio_postgres::SimpleQueryMessage::Row(r) = m {
                Some(r)
            } else {
                None
            }
        })
        .expect("SHOW TENANT must return a row");
    assert_eq!(
        row.get("tenant_id"),
        Some("2"),
        "SHOW TENANT must report the switched tenant id"
    );
    assert_eq!(
        row.get("tenant_name"),
        Some("acme"),
        "SHOW TENANT must report the tenant name"
    );

    // The same collection name created under tenant 0 must NOT collide,
    // because the switched session writes to tenant 2's catalog.
    server
        .exec(
            "CREATE COLLECTION switched_under_acme  \
             (id TEXT PRIMARY KEY, content TEXT NOT NULL) WITH (engine='document_strict')",
        )
        .await
        .expect("same name in tenant 0 must succeed — proof switched session was tenant 2");
}

#[tokio::test]
async fn set_tenant_as_tenant_user_must_not_silently_noop() {
    let server = TestServer::start().await;
    bootstrap_tenant_user(&server, "svc_settnt", "t2_settnt").await;
    server.exec("CREATE TENANT other ID 3").await.unwrap();

    let res = exec_as(&server, "svc_settnt", "SET TENANT = 'other'").await;
    assert_rejected_with_any(
        res,
        &["0A000", "42501"],
        "SET TENANT as tenant-scoped user crossing tenants",
    );
}

#[tokio::test]
async fn set_nodedb_tenant_id_switches_effective_tenant() {
    // The integer alias resolves directly without a name lookup. End-to-end:
    // SET nodedb.tenant_id = 2 → writes route to tenant 2.
    let server = TestServer::start().await;
    server.exec("CREATE TENANT acme ID 2").await.unwrap();

    let (svc, _h) = server.connect_as("nodedb", "nodedb").await.unwrap();
    svc.simple_query("SET nodedb.tenant_id = 2")
        .await
        .expect("SET nodedb.tenant_id must succeed for superuser");
    svc.simple_query(
        "CREATE COLLECTION numeric_alias_check  \
         (id TEXT PRIMARY KEY, content TEXT NOT NULL) WITH (engine='document_strict')",
    )
    .await
    .expect("CREATE under integer-alias switch must succeed");
    server
        .exec(
            "CREATE COLLECTION numeric_alias_check  \
             (id TEXT PRIMARY KEY, content TEXT NOT NULL) WITH (engine='document_strict')",
        )
        .await
        .expect("same name under tenant 0 must succeed — proof switched session was tenant 2");
}

#[tokio::test]
async fn reset_tenant_restores_identity_bound_tenant() {
    let server = TestServer::start().await;
    server.exec("CREATE TENANT acme ID 2").await.unwrap();

    let (svc, _h) = server.connect_as("nodedb", "nodedb").await.unwrap();
    svc.simple_query("SET TENANT = 'acme'").await.unwrap();
    svc.simple_query("RESET TENANT")
        .await
        .expect("RESET TENANT must succeed");

    // After RESET, a collection created here lands back in tenant 0; a
    // duplicate name in tenant 0 must conflict.
    svc.simple_query(
        "CREATE COLLECTION post_reset_check  \
         (id TEXT PRIMARY KEY, content TEXT NOT NULL) WITH (engine='document_strict')",
    )
    .await
    .expect("CREATE after RESET must land in tenant 0");
    let res = server
        .exec(
            "CREATE COLLECTION post_reset_check  \
             (id TEXT PRIMARY KEY, content TEXT NOT NULL) WITH (engine='document_strict')",
        )
        .await;
    assert!(
        res.is_err(),
        "duplicate name in tenant 0 must conflict — proves RESET TENANT actually restored"
    );
}

#[tokio::test]
async fn set_tenant_default_clears_override() {
    let server = TestServer::start().await;
    server.exec("CREATE TENANT acme ID 2").await.unwrap();

    let (svc, _h) = server.connect_as("nodedb", "nodedb").await.unwrap();
    svc.simple_query("SET TENANT = 'acme'").await.unwrap();
    svc.simple_query("SET TENANT = DEFAULT")
        .await
        .expect("SET TENANT = DEFAULT must succeed");

    // SHOW TENANT should now report identity-bound tenant (0 for superuser).
    let msgs = svc.simple_query("SHOW TENANT").await.unwrap();
    let row = msgs
        .iter()
        .find_map(|m| {
            if let tokio_postgres::SimpleQueryMessage::Row(r) = m {
                Some(r)
            } else {
                None
            }
        })
        .expect("SHOW TENANT row");
    assert_ne!(
        row.get("tenant_id"),
        Some("2"),
        "after DEFAULT, SHOW TENANT must not still report tenant 2"
    );
}

#[tokio::test]
async fn set_tenant_inside_transaction_is_rejected() {
    let server = TestServer::start().await;
    server.exec("CREATE TENANT acme ID 2").await.unwrap();

    let (svc, _h) = server.connect_as("nodedb", "nodedb").await.unwrap();
    svc.simple_query("BEGIN").await.unwrap();
    let res = svc.simple_query("SET TENANT = 'acme'").await;
    match res {
        Err(e) => {
            let msg = pg_err(&e);
            assert!(
                msg.contains("25001") || msg.to_lowercase().contains("transaction"),
                "expected 25001 active_sql_transaction, got: {msg}"
            );
        }
        Ok(_) => panic!(
            "SET TENANT inside an active transaction must reject — \
             tenant context cannot change while snapshot / locks are held"
        ),
    }
    let _ = svc.simple_query("ROLLBACK").await;
}

#[tokio::test]
async fn set_nodedb_tenant_id_non_integer_value_is_rejected() {
    // Regression guard for the only currently-correct branch in handle_set:
    // a non-integer value already returns 22023. Keep it locked.
    let server = TestServer::start().await;

    let res = exec_as(&server, "nodedb", "SET nodedb.tenant_id = 'not-an-int'").await;
    match res {
        Err(msg) => assert!(
            msg.contains("22023"),
            "expected 22023 invalid_parameter_value, got: {msg}"
        ),
        Ok(()) => panic!("non-integer nodedb.tenant_id must be rejected with 22023"),
    }
}

#[tokio::test]
async fn set_role_must_not_silently_noop() {
    let server = TestServer::start().await;

    let res = exec_as(&server, "nodedb", "SET ROLE readonly").await;
    assert_rejected_with_any(
        res,
        &["0A000"],
        "SET ROLE (no runtime role-switch path is wired)",
    );
}

#[tokio::test]
async fn set_session_authorization_must_not_silently_noop() {
    let server = TestServer::start().await;
    server
        .exec("CREATE USER svc_other WITH PASSWORD 'x' ROLE readwrite TENANT 1")
        .await
        .unwrap();

    let res = exec_as(&server, "nodedb", "SET SESSION AUTHORIZATION 'svc_other'").await;
    assert_rejected_with_any(
        res,
        &["0A000"],
        "SET SESSION AUTHORIZATION (no runtime identity-switch path is wired)",
    );
}

#[tokio::test]
async fn set_unknown_runtime_parameter_is_rejected_like_show() {
    // SHOW <unknown> returns 42704 (params.rs::is_known_pg_runtime_parameter).
    // SET <unknown> must match — silently storing an arbitrary key in the
    // session bag is what allowed SET TENANT to look successful in the first
    // place. The asymmetry between SET and SHOW is the structural flaw.
    let server = TestServer::start().await;

    let res = exec_as(&server, "nodedb", "SET nodedb.no_such_knob = 'x'").await;
    assert_rejected_with_any(res, &["42704"], "SET on an unknown runtime parameter");
}

/// `SHOW TENANTS` reports each tenant's name alongside its id, so a tenant
/// can be identified by name without grepping the audit log.
#[tokio::test]
async fn show_tenants_includes_tenant_name() {
    let server = TestServer::start().await;
    server
        .exec("CREATE TENANT acme ID 7")
        .await
        .expect("CREATE TENANT");

    let rows = server
        .query_named_rows("SHOW TENANTS")
        .await
        .expect("SHOW TENANTS");
    let acme = rows
        .iter()
        .find(|r| r.get("tenant_id").map(String::as_str) == Some("7"))
        .expect("tenant 7 present in SHOW TENANTS");
    assert_eq!(
        acme.get("name").map(String::as_str),
        Some("acme"),
        "SHOW TENANTS must report the tenant name; row was {acme:?}"
    );
}