use nodedb_types::id::DatabaseId;
use smallvec::SmallVec;
use crate::config::auth::AuthMode;
use crate::control::security::audit::{
ArcAuditEmitter, AuditEmitContext, AuditEmitter, AuditEvent,
};
use crate::control::security::auth_context::{AuthContext, generate_session_id};
use crate::control::security::credential::record::UserRecord;
use crate::control::security::identity::{AuthMethod, AuthenticatedIdentity, DatabaseSet, Role};
use crate::control::security::util::base64_url_decode;
use crate::control::state::SharedState;
use crate::types::TenantId;
pub fn resolve_certificate_identity(
state: &SharedState,
cn: &str,
peer_addr: &str,
) -> crate::Result<AuthenticatedIdentity> {
let identity = state
.credentials
.to_identity(cn, AuthMethod::Certificate)
.ok_or_else(|| {
state.audit_record(
AuditEvent::AuthFailure,
None,
peer_addr,
&format!("mTLS auth failed: no user for cert CN '{cn}'"),
);
state.auth_metrics.record_auth_failure("certificate");
crate::Error::RejectedAuthz {
tenant_id: TenantId::new(0),
resource: format!("no user mapped to certificate CN '{cn}'"),
}
})?;
state.audit_record(
AuditEvent::AuthSuccess,
Some(identity.tenant_id),
peer_addr,
&format!("mTLS cert auth: {cn}"),
);
state.auth_metrics.record_auth_success("certificate");
Ok(identity)
}
fn build_owner_database_set(state: &SharedState, user: &UserRecord) -> DatabaseSet {
if user.is_superuser {
return DatabaseSet::All;
}
if user.is_service_account && !user.accessible_databases.is_empty() {
return DatabaseSet::Some(SmallVec::from_iter(
user.accessible_databases.iter().copied(),
));
}
let db_ids = state
.credentials
.catalog()
.as_ref()
.and_then(|cat| cat.list_user_grant_databases(user.user_id).ok())
.unwrap_or_else(|| vec![DatabaseId::DEFAULT]);
DatabaseSet::Some(SmallVec::from_iter(db_ids))
}
pub fn verify_api_key_identity(
state: &SharedState,
token: &str,
peer_addr: &str,
protocol: &str,
) -> Option<AuthenticatedIdentity> {
let key_record = state.api_keys.verify_key(token)?;
let user = state.credentials.get_user(&key_record.username)?;
let owner_set = build_owner_database_set(state, &user);
let key_set = if key_record.accessible_databases.is_empty() {
owner_set.clone()
} else {
DatabaseSet::Some(SmallVec::from_iter(
key_record.accessible_databases.iter().copied(),
))
};
let effective = owner_set.intersect(&key_set);
let identity =
state
.api_keys
.to_identity(&key_record, user.roles, user.is_superuser, effective);
state.audit_record(
AuditEvent::AuthSuccess,
Some(identity.tenant_id),
peer_addr,
&format!(
"{protocol} api_key auth: {} (key {})",
identity.username, key_record.key_id
),
);
state.auth_metrics.record_auth_success("api_key");
Some(identity)
}
pub fn trust_identity(state: &SharedState, username: &str) -> AuthenticatedIdentity {
if let Some(id) = state.credentials.to_identity(username, AuthMethod::Trust) {
id
} else {
AuthenticatedIdentity {
user_id: 0,
username: username.to_string(),
tenant_id: TenantId::new(1),
auth_method: AuthMethod::Trust,
roles: vec![Role::Superuser],
is_superuser: true,
default_database: None,
accessible_databases: crate::control::security::identity::DatabaseSet::All,
}
}
}
pub const AUTH_FLOOR: std::time::Duration = std::time::Duration::from_millis(200);
pub async fn authenticate(
state: &SharedState,
auth_mode: &AuthMode,
body: &serde_json::Value,
peer_addr: &str,
) -> crate::Result<(AuthenticatedIdentity, Option<String>)> {
let method = body["method"].as_str().unwrap_or("trust");
match method {
"trust" => {
if *auth_mode != AuthMode::Trust {
state.audit_record(
AuditEvent::AuthFailure,
None,
peer_addr,
"trust auth rejected: server requires authentication",
);
return Err(crate::Error::RejectedAuthz {
tenant_id: TenantId::new(0),
resource: "trust mode not enabled".into(),
});
}
let username = body["username"].as_str().unwrap_or("anonymous");
let identity = trust_identity(state, username);
state.audit_record(
AuditEvent::AuthSuccess,
Some(identity.tenant_id),
peer_addr,
&format!("native trust auth: {username}"),
);
state.auth_metrics.record_auth_success("trust");
Ok((identity, None))
}
"password" => {
let username = body["username"]
.as_str()
.ok_or_else(|| crate::Error::BadRequest {
detail: "missing 'username' for password auth".into(),
})?;
let password = body["password"]
.as_str()
.ok_or_else(|| crate::Error::BadRequest {
detail: "missing 'password' for password auth".into(),
})?;
let auth_start = std::time::Instant::now();
use crate::control::security::ratelimit::limiter::LoginRateLimitOutcome;
let peer_ip_str = peer_addr
.parse::<std::net::SocketAddr>()
.map(|s| s.ip().to_string())
.unwrap_or_else(|_| peer_addr.to_string());
let rl_outcome = state.rate_limiter.check_login(&peer_ip_str, username);
if !matches!(rl_outcome, LoginRateLimitOutcome::Allowed) {
let emitter = ArcAuditEmitter(std::sync::Arc::clone(&state.audit));
let detail = match rl_outcome {
LoginRateLimitOutcome::IpExceeded => {
format!("login rate limited (ip={peer_ip_str}): {username}")
}
LoginRateLimitOutcome::UserExceeded => {
format!("login rate limited (user): {username}")
}
LoginRateLimitOutcome::Allowed => unreachable!(),
};
emitter.emit(
AuditEvent::LoginRateLimited,
"login_rate_limit",
&detail,
AuditEmitContext::new(None, "", username),
);
state.auth_metrics.record_auth_failure("password");
enforce_auth_floor(auth_start).await;
return Err(crate::Error::RejectedAuthz {
tenant_id: TenantId::new(0),
resource: "authentication failed".into(),
});
}
if let Err(e) = state.credentials.check_lockout(username) {
enforce_auth_floor(auth_start).await;
return Err(e);
}
let (verified, pw_warning) = state
.credentials
.verify_password_with_status(username, password);
if !verified {
let emitter = ArcAuditEmitter(std::sync::Arc::clone(&state.audit));
let peer_ip = peer_addr
.parse::<std::net::SocketAddr>()
.ok()
.map(|s| s.ip());
state
.credentials
.record_login_failure(username, peer_ip, &emitter);
state.audit_record(
AuditEvent::AuthFailure,
None,
peer_addr,
&format!("native password auth failed: {username}"),
);
state.auth_metrics.record_auth_failure("password");
enforce_auth_floor(auth_start).await;
return Err(crate::Error::RejectedAuthz {
tenant_id: TenantId::new(0),
resource: "authentication failed".into(),
});
}
state.credentials.record_login_success(username);
let identity = state
.credentials
.to_identity(username, AuthMethod::CleartextPassword)
.ok_or_else(|| crate::Error::BadRequest {
detail: format!("user '{username}' not found after password verification"),
})?;
state.audit_record(
AuditEvent::AuthSuccess,
Some(identity.tenant_id),
peer_addr,
&format!("native password auth: {username}"),
);
state.auth_metrics.record_auth_success("password");
if let Some(ref w) = pw_warning {
tracing::warn!(username, warning = %w, "password warning at native password auth");
}
Ok((identity, pw_warning))
}
"api_key" => {
let token = body["token"]
.as_str()
.ok_or_else(|| crate::Error::BadRequest {
detail: "missing 'token' for api_key auth".into(),
})?;
verify_api_key_identity(state, token, peer_addr, "native")
.ok_or_else(|| {
state.audit_record(
AuditEvent::AuthFailure,
None,
peer_addr,
"native api_key auth failed: invalid token or owner not found",
);
state.auth_metrics.record_auth_failure("api_key");
crate::Error::RejectedAuthz {
tenant_id: TenantId::new(0),
resource: "invalid API key".into(),
}
})
.map(|id| (id, None))
}
"oidc_bearer" => {
let token = body["token"]
.as_str()
.ok_or_else(|| crate::Error::BadRequest {
detail: "missing 'token' for oidc_bearer auth".into(),
})?;
let identity =
crate::control::security::oidc::verify_bearer_token(state, token).await?;
state.audit_record(
AuditEvent::AuthSuccess,
Some(identity.tenant_id),
peer_addr,
&format!(
"OIDC bearer login: sub={} method=oidc_bearer",
identity.username
),
);
state.auth_metrics.record_auth_success("oidc_bearer");
Ok((identity, None))
}
other => Err(crate::Error::BadRequest {
detail: format!(
"unknown auth method: '{other}'. Use 'trust', 'password', 'api_key', or 'oidc_bearer'."
),
}),
}
}
pub fn build_auth_context(identity: &AuthenticatedIdentity) -> AuthContext {
let mut ctx = AuthContext::from_identity(identity, generate_session_id());
ctx.database_id = identity.default_database;
ctx
}
pub fn enrich_auth_context_with_scopes(
ctx: &mut AuthContext,
scope_grants: &crate::control::security::scope::grant::ScopeGrantStore,
org_ids: &[String],
) {
let effective = scope_grants.effective_scopes(&ctx.id, org_ids);
for scope_name in &effective {
let status = scope_grants.scope_status(scope_name, "user", &ctx.id);
ctx.metadata
.insert(format!("scope_status.{scope_name}"), status.to_string());
let expires_at = scope_grants.scope_expires_at(scope_name, "user", &ctx.id);
if expires_at > 0 {
ctx.metadata.insert(
format!("scope_expires_at.{scope_name}"),
expires_at.to_string(),
);
}
}
let scope_list: Vec<String> = effective.into_iter().collect();
if !scope_list.is_empty() {
ctx.metadata.insert("scopes".into(), scope_list.join(","));
}
}
pub fn build_auth_context_with_session(
identity: &AuthenticatedIdentity,
sessions: &crate::control::server::pgwire::session::SessionStore,
addr: &std::net::SocketAddr,
) -> AuthContext {
if let Some(token) = sessions.get_parameter(addr, "nodedb.auth_token") {
if token.matches('.').count() == 2 {
if let Some(payload_b64) = token.split('.').nth(1)
&& let Some(payload_bytes) = base64_url_decode(payload_b64)
&& let Ok(claims) =
sonic_rs::from_slice::<crate::control::security::jwt::JwtClaims>(&payload_bytes)
{
let mut ctx = AuthContext::from_jwt(&claims, generate_session_id());
if let Some(on_deny_val) = sessions.get_parameter(addr, "nodedb.on_deny")
&& let Ok(mode) = crate::control::security::deny::parse_on_deny(&[&on_deny_val])
{
ctx.on_deny_override = Some(mode);
}
return ctx;
}
}
}
let mut ctx = build_auth_context(identity);
if let Some(on_deny_val) = sessions.get_parameter(addr, "nodedb.on_deny")
&& let Ok(mode) = crate::control::security::deny::parse_on_deny(&[&on_deny_val])
{
ctx.on_deny_override = Some(mode);
}
if let Some(db) = sessions.get_current_database(addr) {
ctx.database_id = Some(db);
}
ctx
}
pub fn extract_and_apply_on_deny(
sql: &str,
auth_ctx: &mut crate::control::security::auth_context::AuthContext,
) -> String {
let lower = sql.to_lowercase();
let Some(idx) = lower.rfind("on deny ") else {
return sql.to_string();
};
let trimmed = lower.trim_start();
if !trimmed.starts_with("select") && !trimmed.starts_with("with") {
return sql.to_string();
}
let on_deny_part = &sql[idx + "on deny ".len()..];
let parts: Vec<&str> = on_deny_part.split_whitespace().collect();
match crate::control::security::deny::parse_on_deny(&parts) {
Ok(mode) => {
auth_ctx.on_deny_override = Some(mode);
sql[..idx].trim_end().to_string()
}
Err(_) => sql.to_string(),
}
}
pub fn check_blacklist(
state: &SharedState,
identity: &AuthenticatedIdentity,
peer_addr: &str,
) -> crate::Result<()> {
let user_id = identity.user_id.to_string();
if let Some(entry) = state.blacklist.check_user(&user_id) {
state.audit_record(
AuditEvent::AuthFailure,
Some(identity.tenant_id),
peer_addr,
&format!(
"blacklisted user '{}' denied: {}",
identity.username, entry.reason
),
);
return Err(crate::Error::RejectedAuthz {
tenant_id: identity.tenant_id,
resource: format!("user blacklisted: {}", entry.reason),
});
}
if let Some(entry) = state.blacklist.check_ip(peer_addr) {
state.audit_record(
AuditEvent::AuthFailure,
Some(identity.tenant_id),
peer_addr,
&format!("blacklisted IP '{peer_addr}' denied: {}", entry.reason),
);
return Err(crate::Error::RejectedAuthz {
tenant_id: identity.tenant_id,
resource: format!("IP blacklisted: {}", entry.reason),
});
}
if let Some(status) = state.auth_users.get_status(&user_id) {
let ctx_status = status;
if matches!(
ctx_status,
crate::control::security::auth_context::AuthStatus::Suspended
| crate::control::security::auth_context::AuthStatus::Banned
) {
state.audit_record(
AuditEvent::AuthFailure,
Some(identity.tenant_id),
peer_addr,
&format!(
"auth user '{}' denied: account {}",
identity.username, ctx_status
),
);
return Err(crate::Error::RejectedAuthz {
tenant_id: identity.tenant_id,
resource: format!("account {ctx_status}"),
});
}
}
let user_org_ids = state.orgs.orgs_for_user(&user_id);
for org_id in &user_org_ids {
if !state.orgs.is_active(org_id) {
state.audit_record(
AuditEvent::AuthFailure,
Some(identity.tenant_id),
peer_addr,
&format!(
"org '{}' is not active — user '{}' blocked",
org_id, identity.username
),
);
return Err(crate::Error::RejectedAuthz {
tenant_id: identity.tenant_id,
resource: format!("organization '{org_id}' is suspended"),
});
}
}
Ok(())
}
pub fn check_rate_limit(
state: &SharedState,
identity: &AuthenticatedIdentity,
auth_ctx: &AuthContext,
operation: &str,
database_id: nodedb_types::DatabaseId,
) -> crate::Result<crate::control::security::ratelimit::limiter::RateLimitResult> {
use crate::control::security::ratelimit::limiter::QuotaCheckParams;
let plan_tier = auth_ctx.metadata.get("plan").map(|s| s.as_str());
let quota_params = state.credentials.catalog().as_ref().and_then(|catalog| {
let tenant_max_qps = catalog
.get_tenant_quota(database_id, identity.tenant_id)
.ok()
.flatten()
.and_then(|r| {
if r.max_qps > 0 {
Some(r.max_qps as u64)
} else {
None
}
});
let database_max_qps = catalog
.get_database_quota(database_id)
.ok()
.flatten()
.and_then(|r| {
if r.max_qps > 0 {
Some(r.max_qps as u64)
} else {
None
}
});
if tenant_max_qps.is_some() || database_max_qps.is_some() {
Some(QuotaCheckParams {
tenant_max_qps,
database_max_qps,
tenant_id: identity.tenant_id,
database_id,
})
} else {
None
}
});
let result = state.rate_limiter.check(
&identity.user_id.to_string(),
&auth_ctx.org_ids,
plan_tier,
operation,
quota_params.as_ref(),
);
if !result.allowed {
return Err(crate::Error::RejectedAuthz {
tenant_id: identity.tenant_id,
resource: format!("rate limited: retry after {}s", result.retry_after_secs),
});
}
Ok(result)
}
async fn enforce_auth_floor(auth_start: std::time::Instant) {
let deadline = auth_start + AUTH_FLOOR;
let now = std::time::Instant::now();
if deadline > now {
tokio::time::sleep(deadline - now).await;
}
}
pub fn redact_token(token: &str) -> String {
if token.len() <= 10 {
"***".into()
} else {
format!("{}...", &token[..10])
}
}