nodedb 0.2.0

Local-first, real-time, edge-to-cloud hybrid database for multi-modal workloads
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
// SPDX-License-Identifier: BUSL-1.1

use nodedb_types::DatabaseId;
use pgwire::api::results::{Response, Tag};
use pgwire::error::PgWireResult;

use nodedb_sql::ddl_ast::AlterUserOp;

use crate::control::security::audit::AuditEvent;
use crate::control::security::identity::{AuthenticatedIdentity, Role};
use crate::control::state::SharedState;
use crate::types::TenantId;

use super::super::types::{parse_role, require_tenant_admin, sqlstate_error};

/// CREATE USER <name> WITH PASSWORD '<password>' [ROLE <role>] [TENANT <id>]
pub fn create_user(
    state: &SharedState,
    identity: &AuthenticatedIdentity,
    username: &str,
    password: &str,
    role_name: Option<&str>,
    tenant_id_override: Option<u64>,
) -> PgWireResult<Vec<Response>> {
    require_tenant_admin(identity, "create users")?;

    if username.is_empty() {
        return Err(sqlstate_error(
            "42601",
            "syntax: CREATE USER <name> WITH PASSWORD '<password>' [ROLE <role>] [TENANT <id>]",
        ));
    }

    if password.is_empty() {
        return Err(sqlstate_error(
            "42601",
            "password must be a single-quoted string",
        ));
    }

    let role = role_name.map(parse_role).unwrap_or(Role::ReadWrite);
    let tenant_id = if let Some(tid) = tenant_id_override {
        if !identity.is_superuser {
            return Err(sqlstate_error("42501", "only superuser can assign tenants"));
        }
        TenantId::new(tid)
    } else {
        identity.tenant_id
    };

    // Build the full `StoredUser` locally (hash + salt + user_id).
    // Followers cannot reproduce the random salt, so this step
    // MUST happen on the proposer node. The computed record is
    // then replicated verbatim.
    let stored = state
        .credentials
        .prepare_user(username, password, tenant_id, vec![role])
        .map_err(|e| sqlstate_error("42710", &e.to_string()))?;

    let entry = crate::control::catalog_entry::CatalogEntry::PutUser(Box::new(stored.clone()));
    let log_index = crate::control::metadata_proposer::propose_catalog_entry(state, &entry)
        .map_err(|e| sqlstate_error("XX000", &format!("metadata propose: {e}")))?;
    if log_index == 0 {
        // Single-node / no-cluster fallback: install into the
        // in-memory cache so subsequent reads see the user.
        // Persist to redb when a catalog is wired up — the
        // catalog write is best-effort durability, not a gate
        // on the cache update. Test fixtures (and any future
        // fully-in-memory deployment) can run without a redb
        // catalog and still get correct read-after-write.
        if let Some(catalog) = state.credentials.catalog() {
            catalog
                .put_user(&stored)
                .map_err(|e| sqlstate_error("XX000", &format!("catalog write: {e}")))?;
        }
        // CREATE USER: no open sessions exist for a brand-new user.
        state.credentials.install_replicated_user(&stored, None);
    } else {
        // Cluster mode: `propose_catalog_entry` waits for the
        // entry to be applied on THIS node, which runs the
        // synchronous post_apply (`install_replicated_user`)
        // inline BEFORE the applied-index watermark bumps. So if
        // our entry really committed, `get_user` must see it now.
        //
        // If `get_user` returns None, the Raft log entry at the
        // index our leader assigned has been truncated and
        // overwritten with a noop from a new leader term (a known
        // Raft subtlety: `propose` returns the assigned log index
        // without waiting for commit; if leadership changes
        // before the quorum ack, the entry is dropped). Return a
        // retryable error so `exec_ddl_on_any_leader` re-proposes
        // on the next attempt against whoever is now leader.
        if state.credentials.get_user(username).is_none() {
            return Err(sqlstate_error(
                "40001",
                "transient: metadata entry truncated by leader change, retry",
            ));
        }
    }

    state.audit_record(
        AuditEvent::PrivilegeChange,
        Some(tenant_id),
        &identity.username,
        &format!("created user '{username}' in tenant {tenant_id}"),
    );

    Ok(vec![Response::Execution(Tag::new("CREATE USER"))])
}

/// ALTER USER <name> ... — typed dispatch for all AlterUserOp forms.
pub fn alter_user(
    state: &SharedState,
    identity: &AuthenticatedIdentity,
    username: &str,
    op: &AlterUserOp,
) -> PgWireResult<Vec<Response>> {
    if username.is_empty() {
        return Err(sqlstate_error(
            "42601",
            "syntax: ALTER USER <name> SET PASSWORD '<password>' | SET ROLE <role> | MUST CHANGE PASSWORD | PASSWORD NEVER EXPIRES | PASSWORD EXPIRES ...",
        ));
    }

    // Users can change their own password; admin required for anything else.
    let is_self = username == identity.username;
    let can_alter = is_self || identity.is_superuser || identity.has_role(&Role::TenantAdmin);

    match op {
        AlterUserOp::SetPassword { password } => {
            if !can_alter {
                return Err(sqlstate_error(
                    "42501",
                    "permission denied: can only alter your own user, or be superuser/tenant_admin",
                ));
            }
            if password.is_empty() {
                return Err(sqlstate_error(
                    "42601",
                    "password must be a non-empty single-quoted string",
                ));
            }
            let stored = state
                .credentials
                .prepare_user_update(username, Some(password.as_str()), None)
                .map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
            // Password change — no role/access change; no invalidation.
            propose_and_install(state, stored, None)?;

            state.audit_record(
                AuditEvent::PrivilegeChange,
                Some(identity.tenant_id),
                &identity.username,
                &format!("changed password for user '{username}'"),
            );
            Ok(vec![Response::Execution(Tag::new("ALTER USER"))])
        }

        AlterUserOp::SetRole { role } => {
            if is_self && !identity.is_superuser {
                return Err(sqlstate_error("42501", "cannot change your own role"));
            }
            require_tenant_admin(identity, "change roles")?;
            if role.is_empty() {
                return Err(sqlstate_error("42601", "expected role name after SET ROLE"));
            }
            let parsed_role: Role = parse_role(role);
            let stored = state
                .credentials
                .prepare_user_update(username, None, Some(vec![parsed_role.clone()]))
                .map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
            propose_and_install(
                state,
                stored,
                Some(crate::control::security::buses::SessionInvalidationReason::RoleAltered),
            )?;

            state.audit_record(
                AuditEvent::PrivilegeChange,
                Some(identity.tenant_id),
                &identity.username,
                &format!("set role '{parsed_role}' for user '{username}'"),
            );
            Ok(vec![Response::Execution(Tag::new("ALTER USER"))])
        }

        AlterUserOp::MustChangePassword => {
            require_tenant_admin(identity, "set must_change_password")?;
            let stored = state
                .credentials
                .prepare_set_must_change_password(username, true)
                .map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
            propose_and_install(state, stored, None)?;

            state.audit_record(
                AuditEvent::PrivilegeChange,
                Some(identity.tenant_id),
                &identity.username,
                &format!("set must_change_password for user '{username}'"),
            );
            Ok(vec![Response::Execution(Tag::new("ALTER USER"))])
        }

        AlterUserOp::PasswordNeverExpires => {
            require_tenant_admin(identity, "set password expiry")?;
            let stored = state
                .credentials
                .prepare_set_password_expires_at(username, 0)
                .map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
            propose_and_install(state, stored, None)?;

            state.audit_record(
                AuditEvent::PrivilegeChange,
                Some(identity.tenant_id),
                &identity.username,
                &format!("set PASSWORD NEVER EXPIRES for user '{username}'"),
            );
            Ok(vec![Response::Execution(Tag::new("ALTER USER"))])
        }

        AlterUserOp::PasswordExpiresAt { iso8601 } => {
            require_tenant_admin(identity, "set password expiry")?;
            let expires_at = parse_iso8601_to_unix(iso8601).map_err(|e| {
                sqlstate_error(
                    "22007",
                    &format!("invalid ISO-8601 datetime '{iso8601}': {e}"),
                )
            })?;
            let stored = state
                .credentials
                .prepare_set_password_expires_at(username, expires_at)
                .map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
            propose_and_install(state, stored, None)?;

            state.audit_record(
                AuditEvent::PrivilegeChange,
                Some(identity.tenant_id),
                &identity.username,
                &format!("set PASSWORD EXPIRES '{iso8601}' for user '{username}'"),
            );
            Ok(vec![Response::Execution(Tag::new("ALTER USER"))])
        }

        AlterUserOp::PasswordExpiresInDays { days } => {
            require_tenant_admin(identity, "set password expiry")?;
            if *days == 0 {
                return Err(sqlstate_error(
                    "22003",
                    "PASSWORD EXPIRES IN requires a positive day count",
                ));
            }
            let expires_at = crate::control::security::time::now_secs() + (*days as u64) * 86400;
            let stored = state
                .credentials
                .prepare_set_password_expires_at(username, expires_at)
                .map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
            propose_and_install(state, stored, None)?;

            state.audit_record(
                AuditEvent::PrivilegeChange,
                Some(identity.tenant_id),
                &identity.username,
                &format!("set PASSWORD EXPIRES IN {days} DAYS for user '{username}'"),
            );
            Ok(vec![Response::Execution(Tag::new("ALTER USER"))])
        }

        AlterUserOp::SetDefaultDatabase { db_name } => {
            // Users can set their own default database; admin may set for others.
            if !can_alter {
                return Err(sqlstate_error(
                    "42501",
                    "permission denied: can only alter your own user, or be superuser/tenant_admin",
                ));
            }
            if db_name.is_empty() {
                return Err(sqlstate_error(
                    "42601",
                    "syntax: ALTER USER <name> SET DEFAULT DATABASE <db_name>",
                ));
            }
            // Resolve the database name to an ID via the system catalog.
            let catalog = state
                .credentials
                .catalog()
                .as_ref()
                .ok_or_else(|| sqlstate_error("XX000", "system catalog unavailable"))?;
            let db_id = catalog
                .get_database_id_by_name(db_name)
                .map_err(|e| sqlstate_error("XX000", &format!("catalog lookup: {e}")))?
                .ok_or_else(|| {
                    sqlstate_error("42704", &format!("database '{db_name}' does not exist"))
                })?;
            let stored = state
                .credentials
                .prepare_set_default_database(username, db_id.as_u64())
                .map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
            propose_and_install(state, stored, None)?;

            state.audit_record(
                AuditEvent::PrivilegeChange,
                Some(identity.tenant_id),
                &identity.username,
                &format!("set default database '{db_name}' for user '{username}'"),
            );
            Ok(vec![Response::Execution(Tag::new("ALTER USER"))])
        }
    }
}

/// Propose a `StoredUser` via Raft and install it locally on single-node.
///
/// `invalidation` is passed to `install_replicated_user` for in-process
/// session notification in single-node mode.  Cluster-mode notifications
/// arrive via `post_apply::user::put` after Raft commit.
fn propose_and_install(
    state: &SharedState,
    stored: crate::control::security::catalog::StoredUser,
    invalidation: Option<crate::control::security::buses::SessionInvalidationReason>,
) -> PgWireResult<()> {
    let entry = crate::control::catalog_entry::CatalogEntry::PutUser(Box::new(stored.clone()));
    let log_index = crate::control::metadata_proposer::propose_catalog_entry(state, &entry)
        .map_err(|e| sqlstate_error("XX000", &format!("metadata propose: {e}")))?;
    if log_index == 0 {
        if let Some(catalog) = state.credentials.catalog() {
            catalog
                .put_user(&stored)
                .map_err(|e| sqlstate_error("XX000", &format!("catalog write: {e}")))?;
        }
        state
            .credentials
            .install_replicated_user(&stored, invalidation);
    }
    Ok(())
}

/// Parse an ISO-8601 datetime string to a Unix timestamp (seconds).
///
/// Accepts formats like:
/// - `2026-12-31T00:00:00Z`
/// - `2026-12-31T00:00:00+00:00`
/// - `2026-12-31` (interpreted as midnight UTC)
fn parse_iso8601_to_unix(s: &str) -> crate::Result<u64> {
    // Parse as RFC 3339 or ISO 8601 date. Manually handle common forms
    // without pulling in an external datetime crate.
    let s = s.trim();

    // Date-only: YYYY-MM-DD
    if s.len() == 10 && s.chars().nth(4) == Some('-') && s.chars().nth(7) == Some('-') {
        return parse_date_to_unix(s);
    }

    // Full datetime: YYYY-MM-DDTHH:MM:SSZ or +offset
    if s.len() >= 19 {
        let date_part = &s[..10];
        let ts_secs = parse_date_to_unix(date_part)?;
        // Parse time part: THH:MM:SS
        let time_part = &s[11..];
        let clean = time_part
            .trim_end_matches('Z')
            .trim_end_matches(|c: char| c == '+' || c == '-' || c.is_ascii_digit() || c == ':');
        let hms: Vec<&str> = clean.split(':').collect();
        let h: u64 = hms.first().and_then(|s| s.trim().parse().ok()).unwrap_or(0);
        let m: u64 = hms.get(1).and_then(|s| s.trim().parse().ok()).unwrap_or(0);
        let sec: u64 = hms
            .get(2)
            .and_then(|s| {
                let raw = s.trim().trim_end_matches(|c: char| !c.is_ascii_digit());
                raw.parse().ok()
            })
            .unwrap_or(0);
        return Ok(ts_secs + h * 3600 + m * 60 + sec);
    }

    Err(crate::Error::BadRequest {
        detail: format!("unrecognised datetime format: '{s}'"),
    })
}

/// Parse YYYY-MM-DD to midnight UTC Unix timestamp.
fn parse_date_to_unix(s: &str) -> crate::Result<u64> {
    let parts: Vec<&str> = s.split('-').collect();
    if parts.len() != 3 {
        return Err(crate::Error::BadRequest {
            detail: format!("expected YYYY-MM-DD, got '{s}'"),
        });
    }
    let y: i64 = parts[0].parse().map_err(|_| crate::Error::BadRequest {
        detail: format!("bad year in '{s}'"),
    })?;
    let mo: u64 = parts[1].parse().map_err(|_| crate::Error::BadRequest {
        detail: format!("bad month in '{s}'"),
    })?;
    let d: u64 = parts[2].parse().map_err(|_| crate::Error::BadRequest {
        detail: format!("bad day in '{s}'"),
    })?;
    if !(1..=12).contains(&mo) || !(1..=31).contains(&d) {
        return Err(crate::Error::BadRequest {
            detail: format!("date out of range in '{s}'"),
        });
    }
    // Simplified Julian Day → Unix: good for dates after 1970-01-01.
    // Uses the civil calendar formula.
    let days = days_since_epoch(y, mo, d)?;
    Ok(days * 86400)
}

fn days_since_epoch(y: i64, mo: u64, d: u64) -> crate::Result<u64> {
    // JDN formula for Gregorian calendar
    let a = (14_i64 - mo as i64) / 12;
    let yr = y + 4800 - a;
    let m = mo as i64 + 12 * a - 3;
    let jdn = d as i64 + (153 * m + 2) / 5 + 365 * yr + yr / 4 - yr / 100 + yr / 400 - 32045;
    // Unix epoch = 1970-01-01 = JDN 2440588
    let unix_days = jdn - 2_440_588;
    if unix_days < 0 {
        return Err(crate::Error::BadRequest {
            detail: format!("date before Unix epoch: {y}-{mo:02}-{d:02}"),
        });
    }
    Ok(unix_days as u64)
}

/// DROP USER <name>
pub fn drop_user(
    state: &SharedState,
    identity: &AuthenticatedIdentity,
    parts: &[&str],
) -> PgWireResult<Vec<Response>> {
    require_tenant_admin(identity, "drop users")?;

    if parts.len() < 3 {
        return Err(sqlstate_error("42601", "syntax: DROP USER <name>"));
    }

    let username = parts[2];

    if username == identity.username {
        return Err(sqlstate_error("42501", "cannot drop your own user"));
    }

    // Look up user's tenant before dropping (for ownership reassignment).
    let user_tenant = state
        .credentials
        .get_user(username)
        .map(|u| u.tenant_id)
        .unwrap_or(identity.tenant_id);

    // Pre-check existence so a DROP USER on a missing user is a
    // clean error that doesn't touch raft.
    let exists_before = state.credentials.get_user(username).is_some();
    if !exists_before {
        return Err(sqlstate_error(
            "42704",
            &format!("user '{username}' does not exist"),
        ));
    }

    let entry = crate::control::catalog_entry::CatalogEntry::DeactivateUser {
        username: username.to_string(),
    };
    let log_index = crate::control::metadata_proposer::propose_catalog_entry(state, &entry)
        .map_err(|e| sqlstate_error("XX000", &format!("metadata propose: {e}")))?;
    let dropped = if log_index == 0 {
        // Single-node fallback.
        state
            .credentials
            .deactivate_user(username)
            .map_err(|e| sqlstate_error("XX000", &e.to_string()))?
    } else {
        // Cluster mode: the raft entry committed, so the
        // deactivation WILL be applied on every node. The
        // `post_apply` hook that updates the local in-memory
        // cache runs in a spawned tokio task and may not be
        // visible by the time this function returns — trust the
        // log index rather than re-reading the cache.
        true
    };

    if dropped {
        // Reassign owned collections to the tenant_admin of the
        // user's tenant. Mutating the parent `StoredCollection`
        // and re-proposing `PutCollection` is the durable path —
        // a bare `PutOwner` would be silently overwritten the
        // next time anyone re-proposed the parent (see
        // `pgwire/ddl/ownership.rs` for the same pattern).
        let admin_name = format!("{}_admin", user_tenant.as_u64());
        let grants = state.permissions.grants_for(&format!("user:{username}"));
        if let Some(catalog) = state.credentials.catalog() {
            for grant in &grants {
                let Some(owner_obj) = extract_collection_from_target(&grant.target) else {
                    continue;
                };
                if state
                    .permissions
                    .get_owner("collection", user_tenant, owner_obj)
                    .as_deref()
                    != Some(username)
                {
                    continue;
                }
                let mut stored = match catalog.get_collection(
                    DatabaseId::DEFAULT,
                    user_tenant.as_u64(),
                    owner_obj,
                ) {
                    Ok(Some(c)) => c,
                    _ => continue,
                };
                stored.owner = admin_name.clone();
                let entry = crate::control::catalog_entry::CatalogEntry::PutCollection(Box::new(
                    stored.clone(),
                ));
                if let Ok(idx) =
                    crate::control::metadata_proposer::propose_catalog_entry(state, &entry)
                    && idx == 0
                {
                    let _ = catalog.put_collection(DatabaseId::DEFAULT, &stored);
                    state.permissions.install_replicated_owner(
                        &crate::control::security::catalog::StoredOwner {
                            object_type: "collection".into(),
                            object_name: stored.name.clone(),
                            tenant_id: stored.tenant_id,
                            owner_username: stored.owner.clone(),
                        },
                    );
                }
            }
        }

        state.audit_record(
            AuditEvent::PrivilegeChange,
            Some(identity.tenant_id),
            &identity.username,
            &format!("dropped user '{username}' (ownership reassigned to '{admin_name}')"),
        );
        Ok(vec![Response::Execution(Tag::new("DROP USER"))])
    } else {
        Err(sqlstate_error(
            "42704",
            &format!("user '{username}' does not exist"),
        ))
    }
}

/// Extract collection name from a permission target like "collection:1:users".
fn extract_collection_from_target(target: &str) -> Option<&str> {
    let parts: Vec<&str> = target.splitn(3, ':').collect();
    if parts.len() == 3 && parts[0] == "collection" {
        Some(parts[2])
    } else {
        None
    }
}