use std::time::{SystemTime, UNIX_EPOCH};
use sha2::{Digest, Sha256};
use tracing::warn;
use super::wire::DeltaPushMsg;
use crate::control::security::audit::{AuditEvent, AuditLog};
use crate::control::security::identity::AuthenticatedIdentity;
use crate::control::security::jwt::{JwtConfig, JwtError, JwtValidator};
use crate::control::security::rls::RlsPolicyStore;
use crate::control::security::util::base64_url_decode;
#[derive(Debug)]
pub enum UpgradeAuthResult {
Authenticated {
identity: AuthenticatedIdentity,
expires_in_secs: u64,
},
Rejected { reason: String },
}
pub fn validate_upgrade_token(token: &str, config: &JwtConfig) -> UpgradeAuthResult {
let validator = JwtValidator::new(config.clone());
match validator.validate(token) {
Ok(identity) => {
let expires_in = extract_exp_from_token(token).unwrap_or(0);
let now = now_epoch_secs();
let remaining = expires_in.saturating_sub(now);
UpgradeAuthResult::Authenticated {
identity,
expires_in_secs: remaining,
}
}
Err(e) => UpgradeAuthResult::Rejected {
reason: e.to_string(),
},
}
}
pub fn check_token_refresh_needed(
token: &str,
config: &JwtConfig,
shared: &crate::control::state::SharedState,
) -> Option<u64> {
let token_refresh_window_secs = shared.tuning.network.token_refresh_window_secs;
let validator = JwtValidator::new(config.clone());
match validator.validate(token) {
Ok(_) => {
let exp = extract_exp_from_token(token).unwrap_or(0);
if exp == 0 {
return None; }
let now = now_epoch_secs();
let remaining = exp.saturating_sub(now);
if remaining <= token_refresh_window_secs {
Some(remaining)
} else {
None
}
}
Err(JwtError::Expired) => Some(0), Err(_) => None, }
}
fn extract_exp_from_token(token: &str) -> Option<u64> {
let parts: Vec<&str> = token.split('.').collect();
if parts.len() != 3 {
return None;
}
let payload = base64_url_decode(parts[1])?;
let claims: serde_json::Value = serde_json::from_slice(&payload).ok()?;
claims.get("exp")?.as_u64()
}
#[derive(Debug, Clone)]
pub enum SyncRejectionReason {
RlsPolicyViolation { policy_name: String },
InsufficientPermission,
RateLimited { retry_after_ms: u64 },
TokenExpired,
}
impl std::fmt::Display for SyncRejectionReason {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::RlsPolicyViolation { policy_name } => {
write!(f, "RLS policy '{policy_name}' rejected")
}
Self::InsufficientPermission => write!(f, "insufficient write permission"),
Self::RateLimited { retry_after_ms } => {
write!(f, "rate limited (retry after {retry_after_ms}ms)")
}
Self::TokenExpired => write!(f, "token expired"),
}
}
}
pub fn enforce_rls_on_delta(
delta: &DeltaPushMsg,
identity: &AuthenticatedIdentity,
rls_store: &RlsPolicyStore,
) -> Result<(), SyncRejectionReason> {
let tenant_id = identity.tenant_id.as_u32();
let write_policies = rls_store.write_policies(tenant_id, &delta.collection);
if write_policies.is_empty() {
return Ok(()); }
let doc_value = delta_to_document_value(&delta.delta);
for policy in &write_policies {
if policy.predicate.is_empty() {
continue; }
let filters: Vec<crate::bridge::scan_filter::ScanFilter> =
match rmp_serde::from_slice(&policy.predicate) {
Ok(f) => f,
Err(_) => continue, };
let passes = filters.iter().all(|f| f.matches(&doc_value));
if !passes {
return Err(SyncRejectionReason::RlsPolicyViolation {
policy_name: policy.name.clone(),
});
}
}
Ok(())
}
pub fn log_silent_rejection(
audit_log: &mut AuditLog,
session_id: &str,
identity: &AuthenticatedIdentity,
delta: &DeltaPushMsg,
reason: &SyncRejectionReason,
) {
let delta_hash = sha256_hex(&delta.delta);
let detail = format!(
"sync silent reject: session={}, user={}, tenant={}, collection={}, doc={}, mutation_id={}, reason={}, delta_hash={}, delta_len={}",
session_id,
identity.username,
identity.tenant_id.as_u32(),
delta.collection,
delta.document_id,
delta.mutation_id,
reason,
delta_hash,
delta.delta.len(),
);
let event = match reason {
SyncRejectionReason::RlsPolicyViolation { .. }
| SyncRejectionReason::InsufficientPermission => AuditEvent::AuthzDenied,
SyncRejectionReason::RateLimited { .. } | SyncRejectionReason::TokenExpired => {
AuditEvent::AuthzDenied
}
};
audit_log.record(event, Some(identity.tenant_id), session_id, &detail);
warn!(
session = session_id,
user = %identity.username,
collection = %delta.collection,
doc = %delta.document_id,
mutation_id = delta.mutation_id,
reason = %reason,
"sync delta silently rejected"
);
}
fn delta_to_document_value(delta_bytes: &[u8]) -> serde_json::Value {
if let Ok(value) = rmp_serde::from_slice::<serde_json::Value>(delta_bytes) {
return value;
}
if let Ok(value) = serde_json::from_slice::<serde_json::Value>(delta_bytes) {
return value;
}
serde_json::Value::Object(serde_json::Map::new())
}
fn sha256_hex(data: &[u8]) -> String {
let mut hasher = Sha256::new();
hasher.update(data);
format!("{:x}", hasher.finalize())
}
fn now_epoch_secs() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::control::security::rls::{PolicyType, RlsPolicy};
use crate::types::TenantId;
fn test_identity(tenant: u32, username: &str) -> AuthenticatedIdentity {
AuthenticatedIdentity {
user_id: 1,
username: username.into(),
tenant_id: TenantId::new(tenant),
auth_method: crate::control::security::identity::AuthMethod::ApiKey,
roles: vec![crate::control::security::identity::Role::ReadWrite],
is_superuser: false,
}
}
fn make_delta(collection: &str, doc_id: &str, data: &serde_json::Value) -> DeltaPushMsg {
DeltaPushMsg {
collection: collection.into(),
document_id: doc_id.into(),
delta: rmp_serde::to_vec_named(data).unwrap(),
peer_id: 1,
mutation_id: 42,
checksum: 0,
}
}
#[test]
fn rls_allows_when_no_policies() {
let store = RlsPolicyStore::new();
let identity = test_identity(1, "alice");
let delta = make_delta("orders", "o1", &serde_json::json!({"status": "active"}));
assert!(enforce_rls_on_delta(&delta, &identity, &store).is_ok());
}
#[test]
fn rls_allows_matching_delta() {
let store = RlsPolicyStore::new();
let filter = crate::bridge::scan_filter::ScanFilter {
field: "status".into(),
op: "eq".into(),
value: serde_json::json!("active"),
clauses: Vec::new(),
};
let predicate = rmp_serde::to_vec_named(&vec![filter]).unwrap();
store
.create_policy(RlsPolicy {
name: "require_active".into(),
collection: "orders".into(),
tenant_id: 1,
policy_type: PolicyType::Write,
predicate,
compiled_predicate: None,
mode: crate::control::security::predicate::PolicyMode::default(),
on_deny: Default::default(),
enabled: true,
created_by: "admin".into(),
created_at: 0,
})
.unwrap();
let identity = test_identity(1, "alice");
let delta = make_delta("orders", "o1", &serde_json::json!({"status": "active"}));
assert!(enforce_rls_on_delta(&delta, &identity, &store).is_ok());
}
#[test]
fn rls_rejects_non_matching_delta() {
let store = RlsPolicyStore::new();
let filter = crate::bridge::scan_filter::ScanFilter {
field: "status".into(),
op: "eq".into(),
value: serde_json::json!("active"),
clauses: Vec::new(),
};
let predicate = rmp_serde::to_vec_named(&vec![filter]).unwrap();
store
.create_policy(RlsPolicy {
name: "require_active".into(),
collection: "orders".into(),
tenant_id: 1,
policy_type: PolicyType::Write,
predicate,
compiled_predicate: None,
mode: crate::control::security::predicate::PolicyMode::default(),
on_deny: Default::default(),
enabled: true,
created_by: "admin".into(),
created_at: 0,
})
.unwrap();
let identity = test_identity(1, "alice");
let delta = make_delta("orders", "o1", &serde_json::json!({"status": "draft"}));
let result = enforce_rls_on_delta(&delta, &identity, &store);
assert!(result.is_err());
if let Err(SyncRejectionReason::RlsPolicyViolation { policy_name }) = result {
assert_eq!(policy_name, "require_active");
} else {
panic!("expected RlsPolicyViolation");
}
}
#[test]
fn silent_rejection_logs_audit() {
let mut audit_log = AuditLog::new(100);
let identity = test_identity(1, "alice");
let delta = make_delta("orders", "o1", &serde_json::json!({"x": 1}));
let reason = SyncRejectionReason::RlsPolicyViolation {
policy_name: "test_policy".into(),
};
log_silent_rejection(&mut audit_log, "sess-1", &identity, &delta, &reason);
assert_eq!(audit_log.len(), 1);
let entry = &audit_log.all()[0];
assert_eq!(entry.event, AuditEvent::AuthzDenied);
assert!(entry.detail.contains("test_policy"));
assert!(entry.detail.contains("alice"));
assert!(entry.detail.contains("orders"));
assert!(entry.detail.contains("delta_hash="));
}
#[test]
fn upgrade_rejects_bad_token() {
let config = JwtConfig::default();
let result = validate_upgrade_token("bad.token.here", &config);
assert!(matches!(result, UpgradeAuthResult::Rejected { .. }));
}
#[test]
fn delta_to_document_handles_opaque_bytes() {
let val = delta_to_document_value(&[0x80, 0x80, 0x80, 0x80, 0x80]);
assert!(val.is_object());
assert!(val.as_object().unwrap().is_empty());
let data = serde_json::json!({"key": "value"});
let msgpack = rmp_serde::to_vec_named(&data).unwrap();
let val = delta_to_document_value(&msgpack);
assert_eq!(val["key"], "value");
}
#[test]
fn sha256_hex_deterministic() {
let h1 = sha256_hex(b"hello");
let h2 = sha256_hex(b"hello");
assert_eq!(h1, h2);
assert_eq!(h1.len(), 64);
let h3 = sha256_hex(b"world");
assert_ne!(h1, h3);
}
}