use super::*;
impl GrpcRuntime {
pub(crate) fn resolve_auth(&self, metadata: &MetadataMap) -> AuthResult {
let token = grpc_token(metadata);
if let Some(token_str) = token {
let log_prefix = bearer_token_fingerprint_prefix(token_str);
if is_jwt_shape(token_str) {
if let Some(validator) = self.oauth_validator() {
match crate::wire::redwire::auth::validate_oauth_jwt(&validator, token_str) {
Ok((username, role)) => {
tracing::info!(
target: "reddb::security",
transport = "grpc",
token_sha256_prefix = %log_prefix,
username = %reddb_wire::audit_safe_log_field(&username),
role = %role.as_str(),
"gRPC OAuth JWT accepted"
);
let identity = crate::auth::OAuthIdentity {
username,
tenant: None,
role,
issuer: validator.config().issuer.clone(),
subject: None,
expires_at_unix_secs: None,
};
return AuthResult::from_oauth(identity);
}
Err(reason) => {
tracing::warn!(
target: "reddb::security",
transport = "grpc",
token_sha256_prefix = %log_prefix,
reason = %reddb_wire::audit_safe_log_field(&reason),
"gRPC OAuth JWT rejected"
);
return AuthResult::Denied(format!("oauth jwt: {reason}"));
}
}
}
}
}
if self.auth_store.is_enabled() {
if let Some(token) = token {
if let Some((username, role)) = self.auth_store.validate_token(token) {
return AuthResult::password(username, role);
}
if self.auth_store.config().require_auth {
return AuthResult::Denied("invalid or expired token".into());
}
} else if self.auth_store.config().require_auth {
return AuthResult::Denied("authentication required".into());
}
}
AuthResult::Anonymous
}
pub(crate) fn oauth_validator(&self) -> Option<std::sync::Arc<crate::auth::OAuthValidator>> {
self.oauth_validator.clone()
}
pub(crate) fn authorize_read(&self, metadata: &MetadataMap) -> Result<(), Status> {
self.authorize(metadata, false)
}
pub(crate) fn authorize_write(&self, metadata: &MetadataMap) -> Result<(), Status> {
self.authorize(metadata, true)
}
pub(crate) fn authorize(&self, metadata: &MetadataMap, is_write: bool) -> Result<(), Status> {
let auth = self.resolve_auth(metadata);
check_permission(&auth, is_write, false).map_err(Status::unauthenticated)?;
if is_write {
self.runtime
.check_write(crate::runtime::write_gate::WriteKind::Dml)
.map_err(|err| Status::failed_precondition(err.to_string()))?;
}
Ok(())
}
pub(crate) fn authorize_admin(&self, metadata: &MetadataMap) -> Result<(), Status> {
let auth = self.resolve_auth(metadata);
check_permission(&auth, false, true).map_err(Status::permission_denied)
}
pub(crate) fn authorize_replication_stream<T>(
&self,
request: &Request<T>,
) -> Result<String, Status> {
self.authorize_replication_capability(request, "cluster:replication:stream")
}
pub(crate) fn authorize_replication_ack<T>(
&self,
request: &Request<T>,
) -> Result<String, Status> {
self.authorize_replication_capability(request, "cluster:replication:ack")
}
fn authorize_replication_capability<T>(
&self,
request: &Request<T>,
action: &str,
) -> Result<String, Status> {
let auth = self.resolve_replication_auth(request);
let username = match auth {
AuthResult::Authenticated { username, .. } => username,
AuthResult::Denied(reason) => return Err(Status::unauthenticated(reason)),
AuthResult::Anonymous => {
return Err(Status::unauthenticated("authentication required"));
}
};
let principal = crate::auth::UserId::platform(username.clone());
let resource = crate::auth::policies::ResourceRef::new("cluster", "replication");
let outcome = self.auth_store.simulate(
&principal,
action,
&resource,
crate::auth::store::SimCtx::default(),
);
match outcome.decision {
crate::auth::policies::Decision::Allow { .. }
| crate::auth::policies::Decision::AdminBypass => Ok(username),
_ => Err(Status::permission_denied(format!(
"policy: principal '{username}' is not allowed to perform '{action}'"
))),
}
}
fn resolve_replication_auth<T>(&self, request: &Request<T>) -> AuthResult {
if let Some(certs) = request.peer_certs() {
let Some(cert) = certs.first() else {
return AuthResult::Denied("mTLS peer certificate missing".into());
};
let identity = match crate::cluster::NodeIdentity::from_peer_certificate_der(cert) {
Ok(identity) => identity,
Err(err) => return AuthResult::Denied(format!("mTLS peer identity: {err}")),
};
return AuthResult::Authenticated {
username: identity.to_string(),
role: Role::Read,
source: AuthSource::ClientCert,
};
}
self.resolve_auth(request.metadata())
}
pub(crate) fn enforce_commit_policy_after_write(&self) -> Result<(), Status> {
let post_lsn = self.runtime.cdc_current_lsn();
self.runtime
.enforce_commit_policy(post_lsn)
.map(|_| ())
.map_err(|err| Status::deadline_exceeded(err.to_string()))
}
pub(crate) fn start_graph_analytics_job(
&self,
kind: impl Into<String>,
projection: Option<String>,
metadata: BTreeMap<String, String>,
) -> Result<(), Status> {
let kind = kind.into();
self.admin_use_cases()
.queue_analytics_job(kind.clone(), projection.clone(), metadata.clone())
.map_err(to_status)?;
self.admin_use_cases()
.start_analytics_job(kind, projection, metadata)
.map(|_| ())
.map_err(to_status)
}
pub(crate) fn complete_graph_analytics_job(
&self,
kind: impl Into<String>,
projection: Option<String>,
metadata: BTreeMap<String, String>,
) -> Result<(), Status> {
self.admin_use_cases()
.complete_analytics_job(kind, projection, metadata)
.map(|_| ())
.map_err(to_status)
}
pub(crate) fn fail_graph_analytics_job(
&self,
kind: impl Into<String>,
projection: Option<String>,
metadata: BTreeMap<String, String>,
) -> Result<(), Status> {
self.admin_use_cases()
.fail_analytics_job(kind, projection, metadata)
.map(|_| ())
.map_err(to_status)
}
}
pub(crate) fn to_status(err: crate::api::RedDBError) -> Status {
match err {
crate::api::RedDBError::Query(msg) if msg.starts_with("ask_primary_sync_unavailable:") => {
Status::unavailable(msg)
}
crate::api::RedDBError::QuotaExceeded(msg) => Status::resource_exhausted(msg),
crate::api::RedDBError::ReadOnly(msg) => Status::failed_precondition(msg),
crate::api::RedDBError::FeatureNotEnabled(msg) => Status::failed_precondition(msg),
crate::api::RedDBError::Validation { message, .. } => Status::invalid_argument(message),
other => Status::internal(other.to_string()),
}
}
pub(crate) fn grpc_token(metadata: &MetadataMap) -> Option<&str> {
if let Some(value) = metadata.get("authorization") {
let value = value.to_str().ok()?;
let prefix = "Bearer ";
if value.len() > prefix.len() && value[..prefix.len()].eq_ignore_ascii_case(prefix) {
return Some(value[prefix.len()..].trim());
}
}
metadata.get("x-reddb-token")?.to_str().ok()
}
pub(crate) fn is_jwt_shape(token: &str) -> bool {
let mut segments = 0usize;
for seg in token.split('.') {
if seg.is_empty() {
return false;
}
if !seg
.bytes()
.all(|b| b.is_ascii_alphanumeric() || b == b'-' || b == b'_' || b == b'=')
{
return false;
}
segments += 1;
if segments > 3 {
return false;
}
}
segments == 3
}
pub(crate) fn bearer_token_fingerprint_prefix(token: &str) -> String {
use sha2::{Digest, Sha256};
let mut h = Sha256::new();
h.update(token.as_bytes());
let digest = h.finalize();
format!(
"{:02x}{:02x}{:02x}{:02x}",
digest[0], digest[1], digest[2], digest[3]
)
}
pub(crate) fn none_if_empty(value: &str) -> Option<&str> {
let trimmed = value.trim();
if trimmed.is_empty() {
None
} else {
Some(trimmed)
}
}
pub(crate) fn json_payload_reply(value: JsonValue) -> PayloadReply {
PayloadReply {
ok: true,
payload: json_to_string(&value).unwrap_or_else(|_| "{}".to_string()),
}
}
pub(crate) fn parse_json_payload_allow_empty(payload_json: &str) -> Result<JsonValue, Status> {
if payload_json.trim().is_empty() {
return Ok(JsonValue::Object(Map::new()));
}
parse_json_payload(payload_json)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum GrpcServerlessWarmupScope {
Indexes,
GraphProjections,
AnalyticsJobs,
NativeArtifacts,
}
pub(crate) fn grpc_parse_serverless_readiness_requirements(
payload: &JsonValue,
) -> Result<Vec<String>, String> {
crate::application::serverless_payload::parse_serverless_readiness_requirements(payload)
}
pub(crate) fn grpc_parse_serverless_reclaim_operations(
payload: &JsonValue,
) -> Result<Vec<String>, String> {
crate::application::serverless_payload::parse_serverless_reclaim_operations(payload)
}
pub(crate) fn grpc_parse_serverless_warmup_scopes(
payload: &JsonValue,
) -> Result<Vec<GrpcServerlessWarmupScope>, String> {
crate::application::serverless_payload::parse_serverless_warmup_scopes(payload).map(
|scopes| {
scopes
.into_iter()
.map(|scope| match scope {
crate::application::serverless_payload::ServerlessWarmupScopeToken::Indexes => {
GrpcServerlessWarmupScope::Indexes
}
crate::application::serverless_payload::ServerlessWarmupScopeToken::GraphProjections => {
GrpcServerlessWarmupScope::GraphProjections
}
crate::application::serverless_payload::ServerlessWarmupScopeToken::AnalyticsJobs => {
GrpcServerlessWarmupScope::AnalyticsJobs
}
crate::application::serverless_payload::ServerlessWarmupScopeToken::NativeArtifacts => {
GrpcServerlessWarmupScope::NativeArtifacts
}
})
.collect()
},
)
}
pub(crate) fn grpc_serverless_readiness_summary_to_json(
query_ready: bool,
write_ready: bool,
repair_ready: bool,
health: &crate::health::HealthReport,
authority: &crate::storage::unified::devx::PhysicalAuthorityStatus,
) -> JsonValue {
crate::presentation::serverless_json::serverless_readiness_summary_json(
query_ready,
write_ready,
repair_ready,
health,
authority,
crate::presentation::ops_json::health_json,
crate::presentation::ops_json::physical_authority_status_json,
)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum GrpcDeploymentProfile {
Embedded,
Server,
Serverless,
}
pub(crate) fn grpc_deployment_profile_from_token(value: &str) -> Option<GrpcDeploymentProfile> {
crate::application::serverless_payload::deployment_profile_from_token(value).map(|profile| {
match profile {
crate::application::serverless_payload::DeploymentProfileToken::Embedded => {
GrpcDeploymentProfile::Embedded
}
crate::application::serverless_payload::DeploymentProfileToken::Server => {
GrpcDeploymentProfile::Server
}
crate::application::serverless_payload::DeploymentProfileToken::Serverless => {
GrpcDeploymentProfile::Serverless
}
}
})
}